ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [12/15] ignite git commit: zk
Date Fri, 12 Jan 2018 09:53:42 GMT
zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4805be06
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4805be06
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4805be06

Branch: refs/heads/ignite-zk
Commit: 4805be06f9c374a43781ea6c547478ddbdd18c35
Parents: f53d857
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Jan 12 12:27:44 2018 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Jan 12 12:27:45 2018 +0300

----------------------------------------------------------------------
 modules/core/pom.xml                            |   14 -
 .../org/apache/ignite/internal/IgnitionEx.java  |   78 -
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java |  538 --
 .../zk/internal/ZkAbstractCallabck.java         |   83 -
 .../zk/internal/ZkAbstractChildrenCallback.java |   61 -
 .../zk/internal/ZkAbstractWatcher.java          |   55 -
 .../discovery/zk/internal/ZkAliveNodeData.java  |   40 -
 .../zk/internal/ZkBulkJoinContext.java          |   50 -
 .../discovery/zk/internal/ZkClusterNodes.java   |  103 -
 .../internal/ZkCommunicationErrorNodeState.java |   46 -
 .../ZkCommunicationErrorProcessFuture.java      |  411 --
 ...kCommunicationErrorResolveFinishMessage.java |   69 -
 .../ZkCommunicationErrorResolveResult.java      |   45 -
 ...ZkCommunicationErrorResolveStartMessage.java |   61 -
 .../internal/ZkCommunicationFailureContext.java |  188 -
 .../zk/internal/ZkDiscoveryCustomEventData.java |   89 -
 .../zk/internal/ZkDiscoveryEventData.java       |  165 -
 .../zk/internal/ZkDiscoveryEventsData.java      |  121 -
 .../internal/ZkDiscoveryNodeFailEventData.java  |   55 -
 .../internal/ZkDiscoveryNodeJoinEventData.java  |   60 -
 .../ZkDistributedCollectDataFuture.java         |  243 -
 .../zk/internal/ZkForceNodeFailMessage.java     |   66 -
 .../discovery/zk/internal/ZkIgnitePaths.java    |  378 --
 .../zk/internal/ZkInternalJoinErrorMessage.java |   44 -
 .../zk/internal/ZkInternalMessage.java          |   27 -
 .../zk/internal/ZkJoinEventDataForJoined.java   |   83 -
 .../zk/internal/ZkJoinedNodeEvtData.java        |   79 -
 .../zk/internal/ZkJoiningNodeData.java          |   87 -
 .../zk/internal/ZkNoServersMessage.java         |   50 -
 .../zk/internal/ZkNodeValidateResult.java       |   43 -
 .../spi/discovery/zk/internal/ZkRunnable.java   |   51 -
 .../discovery/zk/internal/ZkRuntimeState.java   |  132 -
 .../discovery/zk/internal/ZkTimeoutObject.java  |   54 -
 .../discovery/zk/internal/ZookeeperClient.java  | 1196 -----
 .../ZookeeperClientFailedException.java         |   40 -
 .../zk/internal/ZookeeperClusterNode.java       |  362 --
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 4483 ----------------
 .../IgniteClientReconnectAbstractTest.java      |    8 +-
 .../zk/internal/ZookeeperClientTest.java        |  480 --
 .../zk/internal/ZookeeperDiscoverySpiTest.java  | 4869 ------------------
 .../testframework/junits/GridAbstractTest.java  |   37 +-
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java |  538 ++
 .../zk/internal/ZkAbstractCallabck.java         |   83 +
 .../zk/internal/ZkAbstractChildrenCallback.java |   61 +
 .../zk/internal/ZkAbstractWatcher.java          |   55 +
 .../discovery/zk/internal/ZkAliveNodeData.java  |   40 +
 .../zk/internal/ZkBulkJoinContext.java          |   50 +
 .../discovery/zk/internal/ZkClusterNodes.java   |  103 +
 .../internal/ZkCommunicationErrorNodeState.java |   46 +
 .../ZkCommunicationErrorProcessFuture.java      |  411 ++
 ...kCommunicationErrorResolveFinishMessage.java |   69 +
 .../ZkCommunicationErrorResolveResult.java      |   45 +
 ...ZkCommunicationErrorResolveStartMessage.java |   61 +
 .../internal/ZkCommunicationFailureContext.java |  188 +
 .../zk/internal/ZkDiscoveryCustomEventData.java |   89 +
 .../zk/internal/ZkDiscoveryEventData.java       |  165 +
 .../zk/internal/ZkDiscoveryEventsData.java      |  121 +
 .../internal/ZkDiscoveryNodeFailEventData.java  |   55 +
 .../internal/ZkDiscoveryNodeJoinEventData.java  |   60 +
 .../ZkDistributedCollectDataFuture.java         |  243 +
 .../zk/internal/ZkForceNodeFailMessage.java     |   66 +
 .../discovery/zk/internal/ZkIgnitePaths.java    |  378 ++
 .../zk/internal/ZkInternalJoinErrorMessage.java |   44 +
 .../zk/internal/ZkInternalMessage.java          |   27 +
 .../zk/internal/ZkJoinEventDataForJoined.java   |   83 +
 .../zk/internal/ZkJoinedNodeEvtData.java        |   79 +
 .../zk/internal/ZkJoiningNodeData.java          |   87 +
 .../zk/internal/ZkNoServersMessage.java         |   50 +
 .../zk/internal/ZkNodeValidateResult.java       |   43 +
 .../spi/discovery/zk/internal/ZkRunnable.java   |   51 +
 .../discovery/zk/internal/ZkRuntimeState.java   |  132 +
 .../discovery/zk/internal/ZkTimeoutObject.java  |   54 +
 .../discovery/zk/internal/ZookeeperClient.java  | 1196 +++++
 .../ZookeeperClientFailedException.java         |   40 +
 .../zk/internal/ZookeeperClusterNode.java       |  362 ++
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 4483 ++++++++++++++++
 .../zk/ZookeeperDiscoverySpiTestSuite2.java     |   13 +-
 .../zk/internal/ZookeeperClientTest.java        |  480 ++
 .../zk/internal/ZookeeperDiscoverySpiTest.java  | 4867 +++++++++++++++++
 79 files changed, 15015 insertions(+), 15147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/core/pom.xml b/modules/core/pom.xml
index ecbc542..5ddc8ce 100644
--- a/modules/core/pom.xml
+++ b/modules/core/pom.xml
@@ -74,20 +74,6 @@
             <version>${jetbrains.annotations.version}</version>
         </dependency>
 
-        <!-- TODO ZK -->
-        <dependency>
-            <groupId>org.apache.zookeeper</groupId>
-            <artifactId>zookeeper</artifactId>
-            <version>${zookeeper.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-test</artifactId>
-            <version>${curator.version}</version>
-        </dependency>
-        <!-- TODO ZK -->
-
         <dependency>
             <groupId>mx4j</groupId>
             <artifactId>mx4j-tools</artifactId>

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 36b3ce6..dd9670e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -105,7 +105,6 @@ import org.apache.ignite.spi.deployment.local.LocalDeploymentSpi;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
-import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
 import org.apache.ignite.spi.eventstorage.NoopEventStorageSpi;
 import org.apache.ignite.spi.failover.always.AlwaysFailoverSpi;
 import org.apache.ignite.spi.indexing.noop.NoopIndexingSpi;
@@ -157,67 +156,6 @@ import static org.apache.ignite.plugin.segmentation.SegmentationPolicy.RESTART_J
  * GridConfiguration cfg = new GridConfiguration();
  */
 public class IgnitionEx {
-    /** */
-    // TODO ZK
-    public static volatile boolean TEST_ZK = IgniteSystemProperties.getBoolean("TEST_ZK", false);
-
-    /** */
-    public static TestingCluster zkCluster;
-
-    synchronized static void startZk() {
-        if (TEST_ZK && zkCluster == null) {
-            System.out.println("Start ZK cluster for tests");
-
-            zkCluster = createTestingCluster(1);
-
-            try {
-                System.setProperty("zookeeper.forceSync", "false");
-                zkCluster.start();
-
-                System.out.println("ZK cluster started: " + zkCluster.getConnectString());
-            }
-            catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    private static TestingCluster createTestingCluster(int instances) {
-        String tmpDir = System.getProperty("java.io.tmpdir");
-
-        List<InstanceSpec> specs = new ArrayList<>();
-
-        for (int i = 0; i < instances; i++) {
-            File file = new File(tmpDir, "apacheIgniteTestZk-" + i);
-
-            if (file.isDirectory())
-                deleteRecursively0(file);
-            else {
-                if (!file.mkdirs())
-                    throw new IgniteException("Failed to create directory for test Zookeeper server: " + file.getAbsolutePath());
-            }
-
-
-            specs.add(new InstanceSpec(file, -1, -1, -1, true, -1, -1, 500));
-        }
-
-        return new TestingCluster(specs);
-    }
-
-    private static void deleteRecursively0(File file) {
-        File[] files = file.listFiles();
-
-        if (files == null)
-            return;
-
-        for (File f : files) {
-            if (f.isDirectory())
-                deleteRecursively0(f);
-            else
-                f.delete();
-        }
-    }
-
     /** Default configuration path relative to Ignite home. */
     public static final String DFLT_CFG = "config/default-config.xml";
 
@@ -2290,22 +2228,6 @@ public class IgnitionEx {
 
             initializeDataStorageConfiguration(myCfg);
 
-            if (TEST_ZK) {
-                startZk();
-
-                ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi();
-
-                DiscoverySpi spi = myCfg.getDiscoverySpi();
-
-                if (spi instanceof TcpDiscoverySpi)
-                    zkSpi.setClientReconnectDisabled(((TcpDiscoverySpi)spi).isClientReconnectDisabled());
-
-                zkSpi.setSessionTimeout(20_000);
-                zkSpi.setZkConnectionString(zkCluster.getConnectString());
-
-                myCfg.setDiscoverySpi(zkSpi);
-            }
-
             return myCfg;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
deleted file mode 100644
index fc1af6a..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ /dev/null
@@ -1,538 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
-import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteProductVersion;
-import org.apache.ignite.resources.LoggerResource;
-import org.apache.ignite.spi.IgniteSpiAdapter;
-import org.apache.ignite.spi.IgniteSpiConfiguration;
-import org.apache.ignite.spi.IgniteSpiContext;
-import org.apache.ignite.spi.IgniteSpiException;
-import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
-import org.apache.ignite.spi.communication.CommunicationSpi;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
-import org.apache.ignite.spi.discovery.DiscoverySpi;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
-import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
-import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
-import org.apache.ignite.spi.discovery.DiscoverySpiListener;
-import org.apache.ignite.spi.discovery.DiscoverySpiMutableCustomMessageSupport;
-import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
-import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
-import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClusterNode;
-import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONSISTENT_ID_BY_HOST_WITHOUT_PORT;
-import static org.apache.ignite.IgniteSystemProperties.getBoolean;
-
-/**
- *
- */
-@IgniteSpiMultipleInstancesSupport(true)
-@DiscoverySpiOrderSupport(true)
-@DiscoverySpiHistorySupport(true)
-@DiscoverySpiMutableCustomMessageSupport(false)
-public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, IgniteDiscoverySpi {
-    /** */
-    public static final String DFLT_ROOT_PATH = "/apacheIgnite";
-
-    /** */
-    @GridToStringInclude
-    private String zkRootPath = DFLT_ROOT_PATH;
-
-    /** */
-    @GridToStringInclude
-    private String zkConnectionString;
-
-    /** */
-    private long joinTimeout = 30_000; // TODO ZK
-
-    /** */
-    @GridToStringInclude
-    private long sesTimeout;
-
-    /** */
-    private boolean clientReconnectDisabled;
-
-    /** */
-    @GridToStringExclude
-    private DiscoverySpiListener lsnr;
-
-    /** */
-    @GridToStringExclude
-    private DiscoverySpiDataExchange exchange;
-
-    /** */
-    @GridToStringExclude
-    private DiscoverySpiNodeAuthenticator nodeAuth;
-
-    /** */
-    @GridToStringExclude
-    private DiscoveryMetricsProvider metricsProvider;
-
-    /** */
-    @GridToStringExclude
-    private ZookeeperDiscoveryImpl impl;
-
-    /** */
-    @GridToStringExclude
-    private Map<String, Object> locNodeAttrs;
-
-    /** */
-    @GridToStringExclude
-    private IgniteProductVersion locNodeVer;
-
-    /** */
-    @GridToStringExclude
-    private Serializable consistentId;
-
-    /** Local node addresses. */
-    private IgniteBiTuple<Collection<String>, Collection<String>> addrs;
-
-    /** */
-    @LoggerResource
-    @GridToStringExclude
-    private IgniteLogger log;
-
-    /** */
-    private IgniteDiscoverySpiInternalListener internalLsnr;
-
-    /**
-     * @return Base path in ZK for znodes created by SPI.
-     */
-    public String getZkRootPath() {
-        return zkRootPath;
-    }
-
-    /**
-     * @param zkRootPath Base path in ZooKeeper for znodes created by SPI.
-     * @return {@code this} for chaining.
-     */
-    @IgniteSpiConfiguration(optional = true)
-    public ZookeeperDiscoverySpi setZkRootPath(String zkRootPath) {
-        this.zkRootPath = zkRootPath;
-
-        return this;
-    }
-
-    /**
-     * @return ZooKeeper session timeout.
-     */
-    public long getSessionTimeout() {
-        return sesTimeout;
-    }
-
-    /**
-     * @param sesTimeout ZooKeeper session timeout.
-     * @return {@code this} for chaining.
-     */
-    @IgniteSpiConfiguration(optional = true)
-    public ZookeeperDiscoverySpi setSessionTimeout(long sesTimeout) {
-        this.sesTimeout = sesTimeout;
-
-        return this;
-    }
-
-    /**
-     * @return Cluster join timeout.
-     */
-    public long getJoinTimeout() {
-        return joinTimeout;
-    }
-
-    /**
-     * @param joinTimeout Cluster join timeout ({@code 0} means wait forever).
-     * @return {@code this} for chaining.
-     */
-    @IgniteSpiConfiguration(optional = true)
-    public ZookeeperDiscoverySpi setJoinTimeout(long joinTimeout) {
-        this.joinTimeout = joinTimeout;
-
-        return this;
-    }
-
-    /**
-     * @return ZooKeeper connection string
-     */
-    public String getZkConnectionString() {
-        return zkConnectionString;
-    }
-
-    /**
-     * @param zkConnectionString ZooKeeper connection string
-     * @return {@code this} for chaining.
-     */
-    @IgniteSpiConfiguration(optional = false)
-    public ZookeeperDiscoverySpi setZkConnectionString(String zkConnectionString) {
-        this.zkConnectionString = zkConnectionString;
-
-        return this;
-    }
-
-    /**
-     * If {@code true} client does not try to reconnect.
-     *
-     * @return Client reconnect disabled flag.
-     */
-    public boolean isClientReconnectDisabled() {
-        return clientReconnectDisabled;
-    }
-
-    /**
-     * Sets client reconnect disabled flag.
-     *
-     * @param clientReconnectDisabled Client reconnect disabled flag.
-     * @return {@code this} for chaining.
-     */
-    @IgniteSpiConfiguration(optional = true)
-    public ZookeeperDiscoverySpi setClientReconnectDisabled(boolean clientReconnectDisabled) {
-        this.clientReconnectDisabled = clientReconnectDisabled;
-
-        return this;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean clientReconnectSupported() {
-        return !clientReconnectDisabled;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void clientReconnect() {
-        impl.reconnect();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean knownNode(UUID nodeId) {
-        return impl.knownNode(nodeId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean supportsCommunicationFailureResolve() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void resolveCommunicationFailure(ClusterNode node, Exception err) {
-        impl.resolveCommunicationError(node, err);
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public Serializable consistentId() throws IgniteSpiException {
-        if (consistentId == null) {
-            consistentId = ignite.configuration().getConsistentId();
-
-            if (consistentId == null) {
-                initAddresses();
-
-                final List<String> sortedAddrs = new ArrayList<>(addrs.get1());
-
-                Collections.sort(sortedAddrs);
-
-                if (getBoolean(IGNITE_CONSISTENT_ID_BY_HOST_WITHOUT_PORT))
-                    consistentId = U.consistentId(sortedAddrs);
-                else {
-                    Integer commPort = null;
-
-                    if (locNodeAttrs != null) {
-                        commPort = (Integer)locNodeAttrs.get(
-                            TcpCommunicationSpi.class.getSimpleName() + "." + TcpCommunicationSpi.ATTR_PORT);
-                    }
-                    else {
-                        CommunicationSpi commSpi = ignite.configuration().getCommunicationSpi();
-
-                        if (commSpi instanceof TcpCommunicationSpi) {
-                            commPort = ((TcpCommunicationSpi)commSpi).boundPort();
-
-                            if (commPort == -1)
-                                commPort = null;
-                        }
-                    }
-
-                    if (commPort == null) {
-                        U.warn(log, "Can not initialize default consistentId, TcpCommunicationSpi port is not initialized.");
-
-                        consistentId = ignite.configuration().getNodeId();
-                    }
-                    else
-                        consistentId = U.consistentId(sortedAddrs, commPort);
-                }
-            }
-        }
-
-        return consistentId;
-    }
-
-    /**
-     *
-     */
-    private void initAddresses() {
-        if (addrs == null) {
-            String locHost = ignite != null ? ignite.configuration().getLocalHost() : null;
-
-            InetAddress locAddr;
-
-            try {
-                locAddr = U.resolveLocalHost(locHost);
-            }
-            catch (IOException e) {
-                throw new IgniteSpiException("Unknown local address: " + locHost, e);
-            }
-
-            try {
-                addrs = U.resolveLocalAddresses(locAddr);
-            }
-            catch (Exception e) {
-                throw new IgniteSpiException("Failed to resolve local host to set of external addresses: " + locHost,
-                    e);
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<ClusterNode> getRemoteNodes() {
-        return impl.remoteNodes();
-    }
-
-    /** {@inheritDoc} */
-    @Override public ClusterNode getLocalNode() {
-        return impl != null ? impl.localNode() : null;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public ClusterNode getNode(UUID nodeId) {
-        return impl.node(nodeId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean pingNode(UUID nodeId) {
-        return impl.pingNode(nodeId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) {
-        assert locNodeAttrs == null;
-        assert locNodeVer == null;
-
-        if (log.isDebugEnabled()) {
-            log.debug("Node attributes to set: " + attrs);
-            log.debug("Node version to set: " + ver);
-        }
-
-        locNodeAttrs = attrs;
-        locNodeVer = ver;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
-        this.lsnr = lsnr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setDataExchange(DiscoverySpiDataExchange exchange) {
-        this.exchange = exchange;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider) {
-        this.metricsProvider = metricsProvider;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void disconnect() throws IgniteSpiException {
-        impl.stop();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) {
-        this.nodeAuth = auth;
-    }
-
-    /**
-     * @return Authenticator.
-     */
-    public DiscoverySpiNodeAuthenticator getAuthenticator() {
-        return nodeAuth;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getGridStartTime() {
-        return impl.gridStartTime();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) {
-        IgniteDiscoverySpiInternalListener internalLsnr = impl.internalLsnr;
-
-        if (internalLsnr != null) {
-            if (!internalLsnr.beforeSendCustomEvent(this, log, msg))
-                return;
-        }
-
-        impl.sendCustomMessage(msg);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void failNode(UUID nodeId, @Nullable String warning) {
-        impl.failNode(nodeId, warning);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isClientMode() throws IllegalStateException {
-        return impl.localNode().isClient();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
-        super.onContextInitialized0(spiCtx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException {
-        if (sesTimeout == 0)
-            sesTimeout = ignite.configuration().getFailureDetectionTimeout().intValue();
-
-        assertParameter(sesTimeout > 0, "sessionTimeout > 0");
-        A.notNullOrEmpty(zkConnectionString, "zkConnectionString can not be empty");
-
-        ZookeeperClusterNode locNode = initLocalNode();
-
-        if (log.isInfoEnabled()) {
-            log.info("Start Zookeeper discovery [zkConnectionString=" + zkConnectionString +
-                ", sessionTimeout=" + sesTimeout +
-                ", zkRootPath=" + zkRootPath + ']');
-        }
-
-        impl = new ZookeeperDiscoveryImpl(
-            this,
-            igniteInstanceName,
-            log,
-            zkRootPath,
-            locNode,
-            lsnr,
-            exchange,
-            internalLsnr);
-
-        try {
-            impl.startJoinAndWait();
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IgniteSpiException("Failed to join cluster, thread was interrupted", e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setInternalListener(IgniteDiscoverySpiInternalListener lsnr) {
-        if (impl != null)
-            impl.internalLsnr = lsnr;
-        else
-            internalLsnr = lsnr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void simulateNodeFailure() {
-        impl.simulateNodeFailure();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void spiStop() throws IgniteSpiException {
-        if (impl != null)
-            impl.stop();
-    }
-
-    /**
-     * @return Local node instance.
-     */
-    private ZookeeperClusterNode initLocalNode() {
-        assert ignite != null;
-
-        initAddresses();
-
-        ZookeeperClusterNode locNode = new ZookeeperClusterNode(
-            ignite.configuration().getNodeId(),
-            addrs.get1(),
-            addrs.get2(),
-            locNodeVer,
-            locNodeAttrs,
-            consistentId(),
-            sesTimeout,
-            ignite.configuration().isClientMode(),
-            metricsProvider);
-
-        locNode.local(true);
-
-        DiscoverySpiListener lsnr = this.lsnr;
-
-        if (lsnr != null)
-            lsnr.onLocalNodeInitialized(locNode);
-
-        if (log.isDebugEnabled())
-            log.debug("Local node initialized: " + locNode);
-
-        if (metricsProvider != null) {
-            locNode.setMetrics(metricsProvider.metrics());
-            locNode.setCacheMetrics(metricsProvider.cacheMetrics());
-        }
-
-        return locNode;
-    }
-
-    /**
-     * Used in tests (called via reflection).
-     *
-     * @return Copy of SPI.
-     */
-    private ZookeeperDiscoverySpi cloneSpiConfiguration() {
-        ZookeeperDiscoverySpi spi = new ZookeeperDiscoverySpi();
-
-        spi.setZkRootPath(zkRootPath);
-        spi.setZkConnectionString(zkConnectionString);
-        spi.setSessionTimeout(sesTimeout);
-        spi.setJoinTimeout(joinTimeout);
-        spi.setClientReconnectDisabled(clientReconnectDisabled);
-
-        return spi;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(ZookeeperDiscoverySpi.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java
deleted file mode 100644
index b80a9dd..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import org.apache.ignite.internal.util.GridSpinBusyLock;
-
-/**
- *
- */
-abstract class ZkAbstractCallabck {
-    /** */
-    final ZkRuntimeState rtState;
-
-    /** */
-    private final ZookeeperDiscoveryImpl impl;
-
-    /** */
-    private final GridSpinBusyLock busyLock;
-
-    /**
-     * @param rtState Runtime state.
-     * @param impl Discovery impl.
-     */
-    ZkAbstractCallabck(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) {
-        this.rtState = rtState;
-        this.impl = impl;
-
-        busyLock = impl.busyLock;
-    }
-
-    /**
-     * @return {@code True} if is able to start processing.
-     */
-    final boolean onProcessStart() {
-        boolean start = rtState.errForClose == null && busyLock.enterBusy();
-
-        if (!start) {
-            assert rtState.errForClose != null;
-
-            onStartFailed();
-
-            return false;
-        }
-
-        return true;
-    }
-
-    /**
-     *
-     */
-    void onStartFailed() {
-        // No-op.
-    }
-
-    /**
-     *
-     */
-    final void onProcessEnd() {
-        busyLock.leaveBusy();
-    }
-
-    /**
-     * @param e Error.
-     */
-    final void onProcessError(Throwable e) {
-        impl.onFatalError(busyLock, e);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
deleted file mode 100644
index 2292e35..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.util.List;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.data.Stat;
-
-/**
- *
- */
-abstract class ZkAbstractChildrenCallback extends ZkAbstractCallabck implements AsyncCallback.Children2Callback {
-    /**
-     * @param rtState Runtime state.
-     * @param impl Discovery impl.
-     */
-    ZkAbstractChildrenCallback(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) {
-        super(rtState, impl);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
-        if (!onProcessStart())
-            return;
-
-        try {
-            processResult0(rc, path, ctx, children, stat);
-
-            onProcessEnd();
-        }
-        catch (Throwable e) {
-            onProcessError(e);
-        }
-    }
-
-    /**
-     * @param rc
-     * @param path
-     * @param ctx
-     * @param children
-     * @param stat
-     * @throws Exception If failed.
-     */
-    abstract void processResult0(int rc, String path, Object ctx, List<String> children, Stat stat)
-        throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java
deleted file mode 100644
index 9098d05..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-
-/**
- *
- */
-abstract class ZkAbstractWatcher extends ZkAbstractCallabck implements Watcher {
-    /**
-     * @param rtState Runtime state.
-     * @param impl Discovery impl.
-     */
-    ZkAbstractWatcher(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) {
-        super(rtState, impl);
-    }
-
-    /** {@inheritDoc} */
-    @Override public final void process(WatchedEvent evt) {
-        if (!onProcessStart())
-            return;
-
-        try {
-            process0(evt);
-
-            onProcessEnd();
-        }
-        catch (Throwable e) {
-            onProcessError(e);
-        }
-    }
-
-    /**
-     * @param evt Event.
-     * @throws Exception If failed.
-     */
-    protected abstract void process0(WatchedEvent evt) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
deleted file mode 100644
index 9574325..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.io.Serializable;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- *
- */
-public class ZkAliveNodeData implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    long lastProcEvt = -1;
-
-    /** */
-    transient boolean needUpdate;
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(ZkAliveNodeData.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java
deleted file mode 100644
index a186aed..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import org.apache.ignite.internal.util.typedef.T2;
-
-/**
- *
- */
-class ZkBulkJoinContext {
-    /** */
-    List<T2<ZkJoinedNodeEvtData, Map<Integer, Serializable>>> nodes;
-
-    /**
-     * @param nodeEvtData Node event data.
-     * @param discoData Discovery data for node.
-     */
-    void addJoinedNode(ZkJoinedNodeEvtData nodeEvtData, Map<Integer, Serializable> discoData) {
-        if (nodes == null)
-            nodes = new ArrayList<>();
-
-        nodes.add(new T2<>(nodeEvtData, discoData));
-    }
-
-    /**
-     * @return Number of joined nodes.
-     */
-    int nodes() {
-        return nodes != null ? nodes.size() : 0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
deleted file mode 100644
index 70bdc3e..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import org.apache.ignite.cluster.ClusterNode;
-
-/**
- *
- */
-public class ZkClusterNodes {
-    /** */
-    final ConcurrentSkipListMap<Long, ZookeeperClusterNode> nodesByOrder = new ConcurrentSkipListMap<>();
-
-    /** */
-    final ConcurrentSkipListMap<Long, ZookeeperClusterNode> nodesByInternalId = new ConcurrentSkipListMap<>();
-
-    /** */
-    final ConcurrentHashMap<UUID, ZookeeperClusterNode> nodesById = new ConcurrentHashMap<>();
-
-    /**
-     * @return Remote nodes.
-     */
-    public Collection<ClusterNode> remoteNodes() {
-        List<ClusterNode> nodes = new ArrayList<>();
-
-        for (ClusterNode node : nodesById.values()) {
-            if (!node.isLocal())
-                nodes.add(node);
-        }
-
-        return nodes;
-    }
-
-    /**
-     * @return Current nodes in topology.
-     */
-    @SuppressWarnings("unchecked")
-    List<ClusterNode> topologySnapshot() {
-        return new ArrayList<>((Collection)nodesByOrder.values());
-    }
-
-    /**
-     * @param node New node.
-     */
-    void addNode(ZookeeperClusterNode node) {
-        assert node.id() != null : node;
-        assert node.order() > 0 : node;
-
-        ZookeeperClusterNode old = nodesById.put(node.id(), node);
-
-        assert old == null : old;
-
-        old = nodesByOrder.put(node.order(), node);
-
-        assert old == null : old;
-
-        old = nodesByInternalId.put(node.internalId(), node);
-
-        assert old == null : old;
-    }
-
-    /**
-     * @param internalId Node internal ID.
-     * @return Removed node.
-     */
-    ZookeeperClusterNode removeNode(long internalId) {
-        ZookeeperClusterNode node = nodesByInternalId.remove(internalId);
-
-        assert node != null : internalId;
-        assert node.order() > 0 : node;
-
-        Object rvmd = nodesByOrder.remove(node.order());
-
-        assert rvmd != null;
-
-        rvmd = nodesById.remove(node.id());
-
-        assert rvmd != null;
-
-        return node;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
deleted file mode 100644
index 9c21f13..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.io.Serializable;
-import java.util.BitSet;
-
-/**
- *
- */
-class ZkCommunicationErrorNodeState implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    final BitSet commState;
-
-    /** */
-    final Exception err;
-
-    /**
-     * @param commState Communication state.
-     * @param err Error if failed get communication state..
-     */
-    ZkCommunicationErrorNodeState(BitSet commState, Exception err) {
-        assert commState != null || err != null;
-
-        this.commState = commState;
-        this.err = err;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
deleted file mode 100644
index accda6e..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.spi.IgniteSpiTimeoutObject;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.jboss.netty.util.internal.ConcurrentHashMap;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Future is created on each node when either connection error occurs or resolve communication error request
- * received.
- */
-class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implements IgniteSpiTimeoutObject, Runnable {
-    /** */
-    private final ZookeeperDiscoveryImpl impl;
-
-    /** */
-    private final IgniteLogger log;
-
-    /** */
-    private final Map<Long, GridFutureAdapter<Boolean>> nodeFuts = new ConcurrentHashMap<>();
-
-    /** */
-    private final long endTime;
-
-    /** */
-    private final IgniteUuid id;
-
-    /** */
-    private State state;
-
-    /** */
-    private long resolveTopVer;
-
-    /** */
-    private Set<Long> resFailedNodes;
-
-    /** */
-    private Exception resErr;
-
-    /** */
-    private ZkDistributedCollectDataFuture collectResFut;
-
-    /**
-     * @param impl Discovery impl.
-     * @param timeout Timeout to wait before initiating resolve process.
-     * @return Future.
-     */
-    static ZkCommunicationErrorProcessFuture createOnCommunicationError(ZookeeperDiscoveryImpl impl, long timeout) {
-        return new ZkCommunicationErrorProcessFuture(impl, State.WAIT_TIMEOUT, timeout);
-    }
-
-    /**
-     * @param impl Discovery impl.
-     * @return Future.
-     */
-    static ZkCommunicationErrorProcessFuture createOnStartResolveRequest(ZookeeperDiscoveryImpl impl) {
-        return new ZkCommunicationErrorProcessFuture(impl, State.RESOLVE_STARTED, 0);
-    }
-
-    /**
-     * @param impl Discovery implementation.
-     * @param state Initial state.
-     * @param timeout Wait timeout before initiating communication errors resolve.
-     */
-    private ZkCommunicationErrorProcessFuture(ZookeeperDiscoveryImpl impl, State state, long timeout) {
-        assert state != State.DONE;
-
-        this.impl = impl;
-        this.log = impl.log();
-
-        if (state == State.WAIT_TIMEOUT) {
-            assert timeout > 0 : timeout;
-
-            id = IgniteUuid.fromUuid(impl.localNode().id());
-            endTime = System.currentTimeMillis() + timeout;
-        }
-        else {
-            id = null;
-            endTime = 0;
-        }
-
-        this.state = state;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public IgniteLogger logger() {
-        return log;
-    }
-
-    /**
-     * @param collectResFut Collect nodes' communication status future.
-     */
-    void nodeResultCollectFuture(ZkDistributedCollectDataFuture collectResFut) {
-        assert this.collectResFut == null : collectResFut;
-
-        this.collectResFut = collectResFut;
-    }
-
-    /**
-     * @param top Topology.
-     * @throws Exception If failed.
-     */
-    void onTopologyChange(ZkClusterNodes top) throws Exception {
-        for (Map.Entry<Long, GridFutureAdapter<Boolean>> e : nodeFuts.entrySet()) {
-            if (!top.nodesByOrder.containsKey(e.getKey()))
-                e.getValue().onDone(false);
-        }
-
-        if (collectResFut != null)
-            collectResFut.onTopologyChange(top);
-    }
-
-    /**
-     * @param rtState Runtime state.
-     * @param futPath Future path.
-     * @param nodes Nodes to ping.
-     */
-    void checkConnection(final ZkRuntimeState rtState, final String futPath, List<ClusterNode> nodes) {
-        final TcpCommunicationSpi spi = (TcpCommunicationSpi)impl.spi.ignite().configuration().getCommunicationSpi();
-
-        IgniteFuture<BitSet> fut = spi.checkConnection(nodes);
-
-        fut.listen(new IgniteInClosure<IgniteFuture<BitSet>>() {
-            @Override public void apply(final IgniteFuture<BitSet> fut) {
-                // Future completed either from NIO thread or timeout worker, save result from another thread.
-                impl.runInWorkerThread(new ZkRunnable(rtState, impl) {
-                    @Override public void run0() throws Exception {
-                        BitSet commState = null;
-                        Exception err = null;
-
-                        try {
-                            commState = fut.get();
-                        }
-                        catch (Exception e) {
-                            err = e;
-                        }
-
-                        ZkCommunicationErrorNodeState state = new ZkCommunicationErrorNodeState(commState, err);
-
-                        ZkDistributedCollectDataFuture.saveNodeResult(futPath,
-                            rtState.zkClient,
-                            impl.localNode().order(),
-                            impl.marshalZip(state));
-                    }
-
-                    @Override void onStartFailed() {
-                        onError(rtState.errForClose);
-                    }
-                });
-
-            }
-        });
-    }
-
-    /**
-     *
-     */
-    void scheduleCheckOnTimeout() {
-        synchronized (this) {
-            if (state == State.WAIT_TIMEOUT)
-                impl.spi.getSpiContext().addTimeoutObject(this);
-        }
-    }
-
-    /**
-     * @param topVer Topology version.
-     * @return {@code False} if future was already completed and need create another future instance.
-     */
-    boolean onStartResolveRequest(long topVer) {
-        synchronized (this) {
-            if (state == State.DONE)
-                return false;
-
-            if (state == State.WAIT_TIMEOUT)
-                impl.spi.getSpiContext().removeTimeoutObject(this);
-
-            assert resolveTopVer == 0 : resolveTopVer;
-
-            resolveTopVer = topVer;
-
-            state = State.RESOLVE_STARTED;
-        }
-
-        return true;
-    }
-
-    /**
-     * @param err Error.
-     */
-    void onError(Exception err) {
-        assert err != null;
-
-        Map<Long, GridFutureAdapter<Boolean>> futs;
-
-        synchronized (this) {
-            if (state == State.DONE) {
-                assert resErr != null;
-
-                return;
-            }
-
-            state = State.DONE;
-
-            resErr = err;
-
-            futs = nodeFuts; // nodeFuts should not be modified after state changed to DONE.
-        }
-
-        for (Map.Entry<Long, GridFutureAdapter<Boolean>> e : futs.entrySet())
-            e.getValue().onDone(err);
-
-        onDone(err);
-    }
-
-    /**
-     * @param failedNodes Node failed as result of resolve process.
-     */
-    void onFinishResolve(Set<Long> failedNodes) {
-        Map<Long, GridFutureAdapter<Boolean>> futs;
-
-        synchronized (this) {
-            if (state == State.DONE) {
-                assert resErr != null;
-
-                return;
-            }
-
-            assert state == State.RESOLVE_STARTED : state;
-
-            state = State.DONE;
-
-            resFailedNodes = failedNodes;
-
-            futs = nodeFuts; // nodeFuts should not be modified after state changed to DONE.
-        }
-
-        for (Map.Entry<Long, GridFutureAdapter<Boolean>> e : futs.entrySet()) {
-            Boolean res = !F.contains(resFailedNodes, e.getKey());
-
-            e.getValue().onDone(res);
-        }
-
-        onDone();
-    }
-
-    /**
-     * @param node Node.
-     * @return Future finished when communication error resolve is done or {@code null} if another
-     *      resolve process should be started.
-     */
-    @Nullable IgniteInternalFuture<Boolean> nodeStatusFuture(ClusterNode node) {
-        GridFutureAdapter<Boolean> fut;
-
-        synchronized (this) {
-            if (state == State.DONE) {
-                if (resolveTopVer != 0 && node.order() <= resolveTopVer) {
-                    Boolean res = !F.contains(resFailedNodes, node.order());
-
-                    return new GridFinishedFuture<>(res);
-                }
-                else
-                    return null;
-            }
-
-            fut = nodeFuts.get(node.order());
-
-            if (fut == null)
-                nodeFuts.put(node.order(), fut = new GridFutureAdapter<>());
-        }
-
-        if (impl.node(node.order()) == null)
-            fut.onDone(false);
-
-        return fut;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void run() {
-        // Run from zk discovery worker pool after timeout.
-        if (needProcessTimeout()) {
-            try {
-                UUID reqId = UUID.randomUUID();
-
-                if (log.isInfoEnabled()) {
-                    log.info("Initiate cluster-wide communication error resolve process [reqId=" + reqId +
-                        ", errNodes=" + nodeFuts.size() + ']');
-                }
-
-                impl.sendCustomMessage(new ZkCommunicationErrorResolveStartMessage(reqId));
-            }
-            catch (Exception e) {
-                Collection<GridFutureAdapter<Boolean>> futs;
-
-                synchronized (this) {
-                    if (state != State.WAIT_TIMEOUT)
-                        return;
-
-                    state = State.DONE;
-                    resErr = e;
-
-                    futs = nodeFuts.values(); // nodeFuts should not be modified after state changed to DONE.
-                }
-
-                for (GridFutureAdapter<Boolean> fut : futs)
-                    fut.onDone(e);
-
-                onDone(e);
-            }
-        }
-    }
-
-    /**
-     * @return {@code True} if need initiate resolve process after timeout expired.
-     */
-    private boolean needProcessTimeout() {
-        synchronized (this) {
-            if (state != State.WAIT_TIMEOUT)
-                return false;
-
-            for (GridFutureAdapter<Boolean> fut : nodeFuts.values()) {
-                if (!fut.isDone())
-                    return true;
-            }
-
-            state = State.DONE;
-        }
-
-        onDone(null, null);
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteUuid id() {
-        return id;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long endTime() {
-        return endTime;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onTimeout() {
-        if (needProcessTimeout())
-            impl.runInWorkerThread(this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
-        if (super.onDone(res, err)) {
-            impl.clearCommunicationErrorProcessFuture(this);
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(ZkCommunicationErrorProcessFuture.class, this);
-    }
-
-    /**
-     *
-     */
-    enum State {
-        /** */
-        DONE,
-
-        /** */
-        WAIT_TIMEOUT,
-
-        /** */
-        RESOLVE_STARTED
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
deleted file mode 100644
index 9b7476c..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.util.UUID;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-class ZkCommunicationErrorResolveFinishMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    final UUID futId;
-
-    /** */
-    final long topVer;
-
-    /** */
-    transient ZkCommunicationErrorResolveResult res;
-
-    /**
-     * @param futId Future ID.
-     * @param topVer Topology version when resolve process finished.
-     */
-    ZkCommunicationErrorResolveFinishMessage(UUID futId, long topVer) {
-        this.futId = futId;
-        this.topVer = topVer;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isMutable() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean stopProcess() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(ZkCommunicationErrorResolveFinishMessage.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
deleted file mode 100644
index 23495aa..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.io.Serializable;
-import org.apache.ignite.internal.util.GridLongList;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-class ZkCommunicationErrorResolveResult implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    final GridLongList killedNodes;
-
-    /** */
-    final Exception err;
-
-    /**
-     * @param killedNodes Killed nodes.
-     * @param err Error.
-     */
-    ZkCommunicationErrorResolveResult(@Nullable GridLongList killedNodes, Exception err) {
-        this.killedNodes = killedNodes;
-        this.err = err;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
deleted file mode 100644
index bb63f30..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.util.UUID;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-public class ZkCommunicationErrorResolveStartMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    final UUID id;
-
-    /**
-     * @param id Unique ID.
-     */
-    ZkCommunicationErrorResolveStartMessage(UUID id) {
-        this.id = id;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isMutable() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean stopProcess() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(ZkCommunicationErrorResolveStartMessage.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java
deleted file mode 100644
index d27b717..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.CommunicationFailureContext;
-import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
-import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- *
- */
-class ZkCommunicationFailureContext implements CommunicationFailureContext {
-    /** */
-    private static final Comparator<ClusterNode> NODE_ORDER_CMP = new Comparator<ClusterNode>() {
-        @Override public int compare(ClusterNode node1, ClusterNode node2) {
-            return Long.compare(node1.order(), node2.order());
-        }
-    };
-
-    /** */
-    private Set<ClusterNode> killedNodes = new HashSet<>();
-
-    /** */
-    private final Map<UUID, BitSet> nodesState;
-
-    /** */
-    private final List<ClusterNode> initialNodes;
-
-    /** */
-    private final List<ClusterNode> curNodes;
-
-    /** */
-    private final GridCacheSharedContext<?, ?> ctx;
-
-    /**
-     * @param ctx Context.
-     * @param curNodes Current topology snapshot.
-     * @param initialNodes Topology snapshot when communication error resolve started.
-     * @param nodesState Nodes communication state.
-     */
-    ZkCommunicationFailureContext(
-        GridCacheSharedContext<?, ?> ctx,
-        List<ClusterNode> curNodes,
-        List<ClusterNode> initialNodes,
-        Map<UUID, BitSet> nodesState)
-    {
-        this.ctx = ctx;
-        this.curNodes = Collections.unmodifiableList(curNodes);
-        this.initialNodes = initialNodes;
-        this.nodesState = nodesState;
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<ClusterNode> topologySnapshot() {
-        return curNodes;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean connectionAvailable(ClusterNode node1, ClusterNode node2) {
-        BitSet nodeState = nodesState.get(node1.id());
-
-        if (nodeState == null)
-            throw new IllegalArgumentException("Invalid node: " + node1);
-
-        int nodeIdx = Collections.binarySearch(initialNodes, node2, NODE_ORDER_CMP);
-
-        if (nodeIdx < 0)
-            throw new IllegalArgumentException("Invalid node: " + node2);
-
-        assert nodeIdx < nodeState.size() : nodeIdx;
-
-        return nodeState.get(nodeIdx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Map<String, CacheConfiguration<?, ?>> startedCaches() {
-        Map<Integer, DynamicCacheDescriptor> cachesMap = ctx.affinity().caches();
-
-        Map<String, CacheConfiguration<?, ?>> res = U.newHashMap(cachesMap.size());
-
-        for (DynamicCacheDescriptor desc : cachesMap.values()) {
-            if (desc.cacheType().userCache())
-                res.put(desc.cacheName(), desc.cacheConfiguration());
-        }
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<List<ClusterNode>> cacheAffinity(String cacheName) {
-        if (cacheName == null)
-            throw new NullPointerException("Null cache name.");
-
-        DynamicCacheDescriptor cacheDesc = ctx.affinity().caches().get(CU.cacheId(cacheName));
-
-        if (cacheDesc == null)
-            throw new IllegalArgumentException("Invalid cache name: " + cacheName);
-
-        GridAffinityAssignmentCache aff = ctx.affinity().groupAffinity(cacheDesc.groupId());
-
-        assert aff != null : cacheName;
-
-        return aff.readyAssignments(aff.lastVersion());
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<List<ClusterNode>> cachePartitionOwners(String cacheName) {
-        if (cacheName == null)
-            throw new NullPointerException("Null cache name.");
-
-        DynamicCacheDescriptor cacheDesc = ctx.affinity().caches().get(CU.cacheId(cacheName));
-
-        if (cacheDesc == null)
-            throw new IllegalArgumentException("Invalid cache name: " + cacheName);
-
-        if (cacheDesc.cacheConfiguration().getCacheMode() == CacheMode.LOCAL)
-            return Collections.emptyList();
-
-        CacheGroupContext grp = ctx.cache().cacheGroup(cacheDesc.groupId());
-
-        GridDhtPartitionTopology top;
-
-        if (grp == null) {
-            top = ctx.exchange().clientTopologyIfExists(cacheDesc.groupId());
-
-            assert top != null : cacheName;
-        }
-        else
-            top = grp.topology();
-
-        return top.allOwners();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void killNode(ClusterNode node) {
-        if (node == null)
-            throw new NullPointerException();
-
-        if (Collections.binarySearch(curNodes, node, NODE_ORDER_CMP) < 0)
-            throw new IllegalArgumentException("Invalid node: " + node);
-
-        killedNodes.add(node);
-    }
-
-    /**
-     * @return Nodes to fail.
-     */
-    Set<ClusterNode> killedNodes() {
-        return killedNodes;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return "ZkCommunicationFailureContext []";
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
deleted file mode 100644
index 21dfe62..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.util.UUID;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
-
-/**
- *
- */
-class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    final long origEvtId;
-
-    /** */
-    final UUID sndNodeId;
-
-    /** */
-    final String evtPath;
-
-    /** Message instance (can be marshalled as part of ZkDiscoveryCustomEventData or stored in separate znode. */
-    DiscoverySpiCustomMessage msg;
-
-    /** Unmarshalled message. */
-    transient DiscoverySpiCustomMessage resolvedMsg;
-
-    /**
-     * @param evtId Event ID.
-     * @param origEvtId For acknowledge events ID of original event.
-     * @param topVer Topology version.
-     * @param sndNodeId Sender node ID.
-     * @param msg Message instance.
-     * @param evtPath Event path.
-     */
-    ZkDiscoveryCustomEventData(
-        long evtId,
-        long origEvtId,
-        long topVer,
-        UUID sndNodeId,
-        DiscoverySpiCustomMessage msg,
-        String evtPath)
-    {
-        super(evtId, ZK_EVT_CUSTOM_EVT, topVer);
-
-        assert sndNodeId != null;
-        assert msg != null || origEvtId != 0 || !F.isEmpty(evtPath);
-
-        this.origEvtId = origEvtId;
-        this.msg = msg;
-        this.sndNodeId = sndNodeId;
-        this.evtPath = evtPath;
-    }
-
-    /**
-     * @return {@code True} for custom event ack message.
-     */
-    boolean ackEvent() {
-        return origEvtId != 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return "ZkDiscoveryCustomEventData [" +
-            "evtId=" + eventId() +
-            ", topVer=" + topologyVersion() +
-            ", sndNode=" + sndNodeId +
-            ", ack=" + ackEvent() +
-            ']';
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
deleted file mode 100644
index d667a17..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Set;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- *
- */
-abstract class ZkDiscoveryEventData implements Serializable {
-    /** */
-    static final byte ZK_EVT_NODE_JOIN = 1;
-
-    /** */
-    static final byte ZK_EVT_NODE_FAILED = 2;
-
-    /** */
-    static final byte ZK_EVT_CUSTOM_EVT = 3;
-
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private final long evtId;
-
-    /** */
-    private final byte evtType;
-
-    /** */
-    private final long topVer;
-
-    /** */
-    private transient Set<Long> remainingAcks;
-
-    /** */
-    int flags;
-
-    /**
-     * @param evtId Event ID.
-     * @param evtType Event type.
-     * @param topVer Topology version.
-     */
-    ZkDiscoveryEventData(long evtId, byte evtType, long topVer) {
-        assert evtType == ZK_EVT_NODE_JOIN || evtType == ZK_EVT_NODE_FAILED || evtType == ZK_EVT_CUSTOM_EVT : evtType;
-
-        this.evtId = evtId;
-        this.evtType = evtType;
-        this.topVer = topVer;
-    }
-
-    /**
-     * @param nodes Current nodes in topology.
-     */
-    void initRemainingAcks(Collection<ZookeeperClusterNode> nodes) {
-        assert remainingAcks == null : this;
-
-        remainingAcks = U.newHashSet(nodes.size());
-
-        for (ZookeeperClusterNode node : nodes) {
-            if (!node.isLocal() && node.order() <= topVer) {
-                boolean add = remainingAcks.add(node.internalId());
-
-                assert add : node;
-            }
-        }
-    }
-
-    /**
-     * @param node Node.
-     */
-    void addRemainingAck(ZookeeperClusterNode node) {
-        assert node.order() <= topVer : node;
-
-        boolean add = remainingAcks.add(node.internalId());
-
-        assert add : node;
-    }
-
-    /**
-     * @return {@code True} if all nodes processed event.
-     */
-    boolean allAcksReceived() {
-        return remainingAcks.isEmpty();
-    }
-
-    /**
-     * @return Remaining acks.
-     */
-    Set<Long> remainingAcks() {
-        return remainingAcks;
-    }
-
-    /**
-     * @param nodeInternalId Node ID.
-     * @param ackEvtId Last event ID processed on node.
-     * @return {@code True} if all nodes processed event.
-     */
-    boolean onAckReceived(Long nodeInternalId, long ackEvtId) {
-        assert remainingAcks != null;
-
-        if (ackEvtId >= evtId)
-            remainingAcks.remove(nodeInternalId);
-
-        return remainingAcks.isEmpty();
-    }
-
-    /**
-     * @param node Failed node.
-     * @return {@code True} if all nodes processed event.
-     */
-    boolean onNodeFail(ZookeeperClusterNode node) {
-        assert remainingAcks != null : this;
-
-        remainingAcks.remove(node.internalId());
-
-        return remainingAcks.isEmpty();
-    }
-
-    /**
-     * @param flag Flag mask.
-     * @return {@code True} if flag set.
-     */
-    boolean flagSet(int flag) {
-        return (flags & flag) == flag;
-    }
-
-    /**
-     * @return Event ID.
-     */
-    long eventId() {
-        return evtId;
-    }
-
-    /**
-     * @return Event type.
-     */
-    byte eventType() {
-        return evtType;
-    }
-
-    /**
-     * @return Event topology version.
-     */
-    long topologyVersion() {
-        return topVer;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
deleted file mode 100644
index dce861b..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.TreeMap;
-import java.util.UUID;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-class ZkDiscoveryEventsData implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Unique cluster ID (generated when first node in cluster starts). */
-    final UUID clusterId;
-
-    /** Internal order of last processed custom event. */
-    long procCustEvt = -1;
-
-    /** Event ID counter. */
-    long evtIdGen;
-
-    /** Current topology version. */
-    long topVer;
-
-    /** Max node internal order in cluster. */
-    long maxInternalOrder;
-
-    /** Cluster start time (recorded when first node in cluster starts). */
-    final long clusterStartTime;
-
-    /** Events to process. */
-    final TreeMap<Long, ZkDiscoveryEventData> evts;
-
-    /** ID of current active communication error resolve process. */
-    private UUID commErrFutId;
-
-    /**
-     * @param clusterStartTime Start time of first node in cluster.
-     * @return Events.
-     */
-    static ZkDiscoveryEventsData createForNewCluster(long clusterStartTime) {
-        return new ZkDiscoveryEventsData(
-            UUID.randomUUID(),
-            clusterStartTime,
-            1L,
-            new TreeMap<Long, ZkDiscoveryEventData>()
-        );
-    }
-
-    /**
-     * @param clusterId Cluster ID.
-     * @param topVer Current topology version.
-     * @param clusterStartTime Cluster start time.
-     * @param evts Events history.
-     */
-    private ZkDiscoveryEventsData(
-        UUID clusterId,
-        long clusterStartTime,
-        long topVer,
-        TreeMap<Long, ZkDiscoveryEventData> evts)
-    {
-        this.clusterId = clusterId;
-        this.clusterStartTime = clusterStartTime;
-        this.topVer = topVer;
-        this.evts = evts;
-    }
-
-    /**
-     * @param node Joined node.
-     */
-    void onNodeJoin(ZookeeperClusterNode node) {
-        if (node.internalId() > maxInternalOrder)
-            maxInternalOrder = node.internalId();
-    }
-
-    /**
-     * @return Future ID.
-     */
-    @Nullable UUID communicationErrorResolveFutureId() {
-        return commErrFutId;
-    }
-
-    /**
-     * @param id Future ID.
-     */
-     void communicationErrorResolveFutureId(@Nullable UUID id) {
-        commErrFutId = id;
-    }
-
-    /**
-     * @param nodes Current nodes in topology (these nodes should ack that event processed).
-     * @param evt Event.
-     */
-    void addEvent(Collection<ZookeeperClusterNode> nodes, ZkDiscoveryEventData evt) {
-        Object old = evts.put(evt.eventId(), evt);
-
-        assert old == null : old;
-
-        evt.initRemainingAcks(nodes);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
deleted file mode 100644
index c76158f..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-/**
- *
- */
-class ZkDiscoveryNodeFailEventData extends ZkDiscoveryEventData {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private long failedNodeInternalId;
-
-    /**
-     * @param evtId Event ID.
-     * @param topVer Topology version.
-     * @param failedNodeInternalId Failed node ID.
-     */
-    ZkDiscoveryNodeFailEventData(long evtId, long topVer, long failedNodeInternalId) {
-        super(evtId, ZK_EVT_NODE_FAILED, topVer);
-
-        this.failedNodeInternalId = failedNodeInternalId;
-    }
-
-    /**
-     * @return Failed node ID.
-     */
-    long failedNodeInternalId() {
-        return failedNodeInternalId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return "ZkDiscoveryNodeFailEventData [" +
-            "evtId=" + eventId() +
-            ", topVer=" + topologyVersion() +
-            ", nodeId=" + failedNodeInternalId + ']';
-    }
-}


Mime
View raw message