ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: zk
Date Fri, 10 Nov 2017 12:11:39 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-zk d1f730789 -> 48175cf3b


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/48175cf3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/48175cf3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/48175cf3

Branch: refs/heads/ignite-zk
Commit: 48175cf3b5a18578173736ba1cbc4493e1327333
Parents: d1f7307
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Nov 10 15:10:59 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Nov 10 15:11:26 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |   6 -
 .../spi/discovery/zk/ZookeeperClusterNode.java  | 201 +++++
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 889 +++++++++++++++++++
 .../zk/ZookeeperDiscoverySpiBasicTest.java      | 113 +++
 4 files changed, 1203 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/48175cf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index fe9ed29..30d3d26 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -536,12 +536,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         for (ClusterNode n : cctx.discovery().remoteNodes())
             cctx.versions().onReceived(n.id(), n.metrics().getLastDataVersion());
 
-        ClusterNode loc = cctx.localNode();
-
-        long startTime = loc.metrics().getStartTime();
-
-        assert startTime > 0;
-
         DiscoveryLocalJoinData locJoin = cctx.discovery().localJoin();
 
         GridDhtPartitionsExchangeFuture fut = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/48175cf3/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java
new file mode 100644
index 0000000..8c28df4
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterMetrics;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.ClusterMetricsSnapshot;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTENT_ID;
+
+/**
+ *
+ */
+public class ZookeeperClusterNode implements ClusterNode, Serializable {
+    /** */
+    private UUID id;
+
+    /** */
+    private Serializable consistentId;
+
+    /** */
+    private long order;
+
+    /** */
+    private IgniteProductVersion ver;
+
+    /** Node attributes. */
+    @GridToStringExclude
+    private Map<String, Object> attrs;
+
+    /** */
+    private transient boolean loc;
+
+    /** TODO */
+    private transient ClusterMetrics metrics;
+
+    /**
+     * @param id Node ID.
+     * @param ver Node version.
+     * @param attrs Node attributes.
+     * @param consistentId Consistent ID.
+     */
+    public ZookeeperClusterNode(UUID id,
+        IgniteProductVersion ver,
+        Map<String, Object> attrs,
+        Serializable consistentId) {
+        assert id != null;
+        assert consistentId != null;
+
+        this.id = id;
+        this.ver = ver;
+        this.attrs = U.sealMap(attrs);
+        this.consistentId = consistentId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object consistentId() {
+        return consistentId;
+    }
+
+    /**
+     * Sets consistent globally unique node ID which survives node restarts.
+     *
+     * @param consistentId Consistent globally unique node ID.
+     */
+    public void setConsistentId(Serializable consistentId) {
+        this.consistentId = consistentId;
+
+        final Map<String, Object> map = new HashMap<>(attrs);
+
+        map.put(ATTR_NODE_CONSISTENT_ID, consistentId);
+
+        attrs = Collections.unmodifiableMap(map);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public <T> T attribute(String name) {
+        // Even though discovery SPI removes this attribute after authentication, keep this
check for safety.
+        if (IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(name))
+            return null;
+
+        return (T)attrs.get(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterMetrics metrics() {
+        if (metrics == null)
+            metrics = new ClusterMetricsSnapshot();
+
+        return metrics;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<String, Object> attributes() {
+        // Even though discovery SPI removes this attribute after authentication, keep this
check for safety.
+        return F.view(attrs, new IgnitePredicate<String>() {
+            @Override public boolean apply(String s) {
+                return !IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(s);
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<String> addresses() {
+        return Collections.emptyList();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<String> hostNames() {
+        return Collections.emptyList();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long order() {
+        return order;
+    }
+
+    /**
+     * @param order Order of the node.
+     */
+    public void order(long order) {
+        assert order > 0 : order;
+
+        this.order = order;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteProductVersion version() {
+        return ver;
+    }
+
+    /**
+     * @param loc Local node flag.
+     */
+    void local(boolean loc) {
+        this.loc = loc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isLocal() {
+        return loc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isDaemon() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isClient() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return id.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        return F.eqNodes(this, obj);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "ZookeeperClusterNode [id=" + id + ", order=" + order + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/48175cf3/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
new file mode 100644
index 0000000..c5dd0dc
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -0,0 +1,889 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.IgniteSpiAdapter;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
+import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
+import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
+import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
+import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+@IgniteSpiMultipleInstancesSupport(true)
+@DiscoverySpiOrderSupport(true)
+@DiscoverySpiHistorySupport(true)
+public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
+    /** */
+    private static final String IGNITE_PATH = "/ignite";
+
+    /** */
+    private static final String CLUSTER_PATH = IGNITE_PATH + "/cluster";
+
+    /** */
+    private static final String EVENTS_PATH = CLUSTER_PATH + "/events";
+
+    /** */
+    private static final String JOIN_HIST_PATH = CLUSTER_PATH + "/joinHist";
+
+    /** */
+    private static final String ALIVE_NODES_PATH = CLUSTER_PATH + "/alive";
+
+    /** */
+    private String connectString;
+
+    /** */
+    private DiscoverySpiListener lsnr;
+
+    /** */
+    private DiscoverySpiDataExchange exchange;
+
+    /** */
+    private DiscoveryMetricsProvider metricsProvider;
+
+    /** */
+    private ZooKeeper zk;
+
+    /** */
+    private int sesTimeout = 5000;
+
+    /** */
+    private final ZookeeperWatcher zkWatcher;
+
+    /** */
+    private final JdkMarshaller marsh = new JdkMarshaller();
+
+    /** */
+    private final NodesUpdateCallback nodesUpdateCallback;
+
+    /** */
+    private final DataUpdateCallback dataUpdateCallback;
+
+    /** */
+    private ZookeeperClusterNode locNode;
+
+    /** */
+    private Map<String, Object> locNodeAttrs;
+
+    /** */
+    private IgniteProductVersion locNodeVer;
+
+    /** */
+    private long gridStartTime;
+
+    /** */
+    @LoggerResource
+    private IgniteLogger log;
+
+    /** */
+    private CountDownLatch joinLatch = new CountDownLatch(1);
+
+    /**
+     *
+     */
+    public ZookeeperDiscoverySpi() {
+        zkWatcher = new ZookeeperWatcher();
+
+        nodesUpdateCallback = new NodesUpdateCallback();
+        dataUpdateCallback = new DataUpdateCallback();
+    }
+
+    public int getSessionTimeout() {
+        return sesTimeout;
+    }
+
+    public void setSessionTimeout(int sesTimeout) {
+        this.sesTimeout = sesTimeout;
+    }
+
+    public String getConnectString() {
+        return connectString;
+    }
+
+    public void setConnectString(String connectString) {
+        this.connectString = connectString;
+    }
+
+    /** */
+    private Serializable consistentId;
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Serializable consistentId() throws IgniteSpiException {
+        if (consistentId == null) {
+            final Serializable cfgId = ignite.configuration().getConsistentId();
+
+            consistentId = cfgId;
+        }
+
+        return consistentId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<ClusterNode> getRemoteNodes() {
+        // TODO
+        List<ClusterNode> nodes;
+
+        synchronized (curTop) {
+            nodes = new ArrayList<>((Collection)curTop.values());
+
+            for (ClusterNode node : curTop.values()) {
+                if (!locNode.id().equals(node.id()))
+                    nodes.add(node);
+            }
+        }
+
+        return nodes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterNode getLocalNode() {
+        return locNode;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public ClusterNode getNode(UUID nodeId) {
+        // TODO
+        synchronized (curTop) {
+            for (ClusterNode node : curTop.values()) {
+                if (node.id().equals(nodeId))
+                    return node;
+            }
+        }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean pingNode(UUID nodeId) {
+        // TODO
+        return getNode(nodeId) != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion
ver) {
+        assert locNodeAttrs == null;
+        assert locNodeVer == null;
+
+        if (log.isDebugEnabled()) {
+            log.debug("Node attributes to set: " + attrs);
+            log.debug("Node version to set: " + ver);
+        }
+
+        locNodeAttrs = attrs;
+        locNodeVer = ver;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
+        this.lsnr = lsnr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setDataExchange(DiscoverySpiDataExchange exchange) {
+        this.exchange = exchange;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider) {
+        this.metricsProvider = metricsProvider;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void disconnect() throws IgniteSpiException {
+        // TODO
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) {
+        // TODO
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getGridStartTime() {
+        return gridStartTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException
{
+        // TODO
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void failNode(UUID nodeId, @Nullable String warning) {
+        // TODO
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isClientMode() throws IllegalStateException {
+        // TODO
+        return false;
+    }
+
+    /**
+     *
+     */
+    private void initLocalNode() {
+        assert ignite != null;
+
+        locNode = new ZookeeperClusterNode(ignite.configuration().getNodeId(),
+            locNodeVer,
+            locNodeAttrs,
+            consistentId());
+
+        locNode.local(true);
+
+        DiscoverySpiListener lsnr = this.lsnr;
+
+        if (lsnr != null)
+            lsnr.onLocalNodeInitialized(locNode);
+
+        if (log.isDebugEnabled())
+            log.debug("Local node initialized: " + locNode);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException
{
+        try {
+            initLocalNode();
+
+            zk = new ZooKeeper(connectString, sesTimeout, zkWatcher);
+
+            // TODO: properly handle first node start and init after full cluster restart.
+            if (zk.exists(IGNITE_PATH, false) == null) {
+                log.info("Initialize Zookeeper nodes.");
+
+                List<Op> initOps = new ArrayList<>();
+
+                ZKClusterData clusterData = new ZKClusterData(U.currentTimeMillis());
+
+                initOps.add(Op.create(IGNITE_PATH, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT));
+                initOps.add(Op.create(CLUSTER_PATH, marshal(clusterData), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT));
+                initOps.add(Op.create(JOIN_HIST_PATH, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT));
+                initOps.add(Op.create(ALIVE_NODES_PATH, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT));
+                initOps.add(Op.create(EVENTS_PATH, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT));
+
+                zk.multi(initOps);
+            }
+
+            ZKClusterData clusterData = unmarshal(zk.getData(CLUSTER_PATH, false, null));
+
+            gridStartTime = clusterData.gridStartTime;
+
+            zk.getData(EVENTS_PATH, true, dataUpdateCallback, null);
+            zk.getChildren(ALIVE_NODES_PATH, true, nodesUpdateCallback, null);
+            zk.getChildren(JOIN_HIST_PATH, true, nodesUpdateCallback, null);
+
+            List<Op> joinOps = new ArrayList<>();
+
+            byte[] nodeData = marshal(locNode);
+
+            String zkNode = "/" + locNode.id().toString() + "-";
+
+            joinOps.add(Op.create(JOIN_HIST_PATH + zkNode, nodeData, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL));
+            joinOps.add(Op.create(ALIVE_NODES_PATH + zkNode, nodeData, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL));
+
+            List<OpResult> res = zk.multi(joinOps);
+
+            log.info("Waiting for local join event.");
+
+            joinLatch.await();
+        }
+        catch (Exception e) {
+            throw new IgniteSpiException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void spiStop() throws IgniteSpiException {
+        if (zk != null) {
+            try {
+                log.info("Close Zookeeper client.");
+
+                zk.close();
+            }
+            catch (Exception e) {
+                U.warn(log, "Failed to stop zookeeper client: " + e, e);
+            }
+
+            zk = null;
+        }
+    }
+
+    private <T> T unmarshal(byte[] data) {
+        try {
+            return marsh.unmarshal(data, null);
+        }
+        catch (Exception e) {
+            U.error(log, "Unmarshal error: " + e);
+
+            throw new IgniteException(e);
+        }
+    }
+
+    private byte[] marshal(Object obj) {
+        try {
+            return marsh.marshal(obj);
+        }
+        catch (Exception e) {
+            U.error(log, "Marshal error: " + e);
+
+            throw new IgniteException(e);
+        }
+    }
+
+    /** */
+    private static final int ID_LEN = 36;
+
+    /**
+     *
+     */
+    static class ZKNodeData implements Serializable {
+        /** */
+        @GridToStringInclude
+        final long order;
+
+        /** */
+        @GridToStringInclude
+        final UUID nodeId;
+
+        /** */
+        transient ZookeeperClusterNode clusterNode;
+
+        /**
+         * @param order Node order.
+         * @param nodeId Node ID.
+         */
+        ZKNodeData(long order, UUID nodeId) {
+            this.order = order;
+            this.nodeId = nodeId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ZKNodeData.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class ZKAliveNodes implements Serializable {
+        /** */
+        @GridToStringInclude
+        final int ver;
+
+        /** */
+        @GridToStringInclude
+        final TreeMap<Long, ZKNodeData> nodesByOrder;
+
+        /** */
+        final TreeMap<UUID, ZKNodeData> nodesById;
+
+        /**
+         * @param ver
+         * @param nodesByOrder
+         */
+        ZKAliveNodes(int ver, TreeMap<Long, ZKNodeData> nodesByOrder) {
+            this.ver = ver;
+            this.nodesByOrder = nodesByOrder;
+
+            nodesById = new TreeMap<>();
+
+            for (ZKNodeData nodeData : nodesByOrder.values())
+                nodesById.put(nodeData.nodeId, nodeData);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ZKAliveNodes.class, this);
+        }
+    }
+
+    /**
+     * @param path Zookeeper node path.
+     * @return Ignite node data.
+     */
+    private static ZKNodeData parseNodePath(String path) {
+        String idStr = path.substring(0, ID_LEN);
+
+        UUID nodeId = UUID.fromString(idStr);
+
+        int nodeOrder = Integer.parseInt(path.substring(ID_LEN + 1)) + 1;
+
+        return new ZKNodeData(nodeOrder, nodeId);
+    }
+
+    /** */
+    private Map<Long, ZKNodeData> joinHist = new HashMap<>();
+
+    /** */
+    private boolean crd;
+
+    /** */
+    private ZKAliveNodes curAlive;
+
+    /**
+     *
+     */
+    class NodesUpdateCallback implements AsyncCallback.Children2Callback {
+        @Override public void processResult(int rc, String path, Object ctx, List<String>
children, Stat stat) {
+            if (children == null || children.isEmpty())
+                return;
+
+            if (path.equals(JOIN_HIST_PATH)) {
+                log.info("Join nodes changed [rc=" + rc +
+                    ", path=" + path +
+                    ", nodes=" + children +
+                    ", ver=" + (stat != null ? stat.getCversion() : null) + ']');
+
+                for (String child : children) {
+                    ZKNodeData data = parseNodePath(child);
+
+                    if (joinHist.put(data.order, data) == null) {
+                        try {
+                            byte[] nodeData = zk.getData(path + "/" + child, null, null);
+
+                            assert nodeData.length > 0;
+
+                            data.clusterNode = unmarshal(nodeData);
+
+                            data.clusterNode.order(data.order);
+                        }
+                        catch (Exception e) {
+                            // TODO
+                            U.error(log, "Failed to get node data: " + e, e);
+                        }
+                    }
+                }
+            }
+            else if (path.equals(ALIVE_NODES_PATH)) {
+                log.info("Alive nodes changed [rc=" + rc +
+                    ", path=" + path +
+                    ", nodes=" + children +
+                    ", ver=" + (stat != null ? stat.getCversion() : null) + ']');
+
+                assert stat != null;
+
+                TreeMap<Long, ZKNodeData> nodes = new TreeMap<>();
+
+                for (String child : children) {
+                    ZKNodeData data = parseNodePath(child);
+
+                    nodes.put(data.order, data);
+                }
+
+                ZKAliveNodes newAlive = new ZKAliveNodes(stat.getCversion(), nodes);
+
+                generateEvents(curAlive, newAlive);
+
+                curAlive = newAlive;
+            }
+        }
+    }
+
+    /** */
+    private final TreeMap<Long, ZookeeperClusterNode> curTop = new TreeMap<>();
+
+    /**
+     * @param oldNodes
+     * @param newNodes
+     */
+    private void generateEvents(ZKAliveNodes oldNodes, ZKAliveNodes newNodes) {
+        assert newNodes != null;
+
+        ZKNodeData locNode = newNodes.nodesById.get(this.locNode.id());
+
+        if (locNode == null)
+            return;
+
+        if (!crd && newNodes.nodesByOrder.firstKey() == locNode.order) {
+            log.info("Node become coordinator [oldNodes=" + oldNodes + ", curEvts=" + curEvts
+ ']');
+
+            if (curEvts != null) {
+                assert curEvts.aliveNodes != null;
+
+                oldNodes = curEvts.aliveNodes;
+
+                log.info("Node coordinator use old nodes from last events [oldNodes=" + oldNodes
+ ']');
+            }
+            else if (oldNodes == null) {
+                oldNodes = new ZKAliveNodes(0, new TreeMap<Long, ZKNodeData>());
+
+                log.info("Node coordinator init old nodes [oldNodes=" + oldNodes + ']');
+            }
+
+            curCrdEvts = curEvts;
+
+            crd = true;
+        }
+
+        if (!crd)
+            return;
+
+        log.info("Generate discovery events [oldNodes=" + oldNodes + ", newNodes=" + newNodes
+ ']');
+
+        if (oldNodes.ver == newNodes.ver)
+            return;
+
+        long nextJoinOrder = oldNodes.nodesByOrder.isEmpty() ? 1 : oldNodes.nodesByOrder.lastKey()
+ 1;
+
+        TreeMap<Integer, ZKDiscoveryEvent> evts = new TreeMap<>();
+
+        Set<Long> failed = new HashSet<>();
+
+        synchronized (curTop) {
+            for (int v = oldNodes.ver + 1; v <= newNodes.ver; v++) {
+                ZKNodeData data = joinHist.get(nextJoinOrder);
+
+                if (data != null) {
+                    curTop.put(data.clusterNode.order(), data.clusterNode);
+
+                    evts.put(v, new ZKDiscoveryEvent(EventType.EVT_NODE_JOINED,
+                        v,
+                        data.clusterNode,
+                        new ArrayList<>(curTop.values())));
+
+                    if (!newNodes.nodesByOrder.containsKey(data.order)) {
+                        v++;
+
+                        ZookeeperClusterNode failedNode = curTop.remove(data.order);
+
+                        assert failedNode != null : data.order;
+
+                        evts.put(v, new ZKDiscoveryEvent(EventType.EVT_NODE_FAILED,
+                            v,
+                            failedNode,
+                            new ArrayList<>(curTop.values())));
+                    }
+
+                    nextJoinOrder++;
+                }
+                else {
+                    for (ZKNodeData oldData : oldNodes.nodesByOrder.values()) {
+                        if (!failed.contains(oldData.order) && !newNodes.nodesByOrder.containsKey(oldData.order))
{
+                            failed.add(oldData.order);
+
+                            ZookeeperClusterNode failedNode = curTop.remove(oldData.order);
+
+                            assert failedNode != null : oldData.order;
+
+                            evts.put(v, new ZKDiscoveryEvent(EventType.EVT_NODE_FAILED,
+                                v,
+                                failedNode,
+                                new ArrayList<>(curTop.values())));
+
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+
+        log.info("Generated discovery events on coordinator: " + evts);
+
+        ZKDiscoveryEvents newEvents;
+
+        int expVer;
+
+        if (curCrdEvts == null) {
+            expVer = 0;
+
+            newEvents = new ZKDiscoveryEvents(newNodes, evts);
+        }
+        else {
+            TreeMap<Integer, ZKDiscoveryEvent> evts0 = new TreeMap<>(curCrdEvts.evts);
+
+            evts0.putAll(evts);
+
+            newEvents = new ZKDiscoveryEvents(newNodes, evts);
+
+            expVer = curCrdEvts.ver;
+        }
+
+        newEvents.ver = expVer + 1;
+
+        try {
+            zk.setData(EVENTS_PATH, marshal(newEvents), expVer);
+        }
+        catch (Exception e) {
+            log.info("Events update error: " + e);
+
+            e.printStackTrace(System.out);
+        }
+
+        curCrdEvts = newEvents;
+    }
+
+    /** */
+    private ZKDiscoveryEvents curEvts;
+
+    /** */
+    private ZKDiscoveryEvents curCrdEvts;
+
+    /** */
+    private ZKDiscoveryEvent lastEvt;
+
+    /**
+     *
+     */
+    class DataUpdateCallback implements AsyncCallback.DataCallback {
+        @Override public void processResult(int rc, String path, Object ctx, byte[] data,
Stat stat) {
+            log.info("Data changed [path=" + path + ", ver=" + (stat != null ? stat.getVersion()
: null) + ']');
+
+            if (data.length == 0)
+                return;
+
+            if (path.equals(EVENTS_PATH)) {
+                assert stat != null;
+
+                ZKDiscoveryEvents newEvts = unmarshal(data);
+
+                newEvts.ver = stat.getVersion();
+
+                for (ZKDiscoveryEvent e : newEvts.evts.values()) {
+                    boolean fireEvt;
+                    boolean locJoin = false;
+
+                    if (lastEvt == null) {
+                        locNode.order(e.node.order());
+
+                        locJoin = e.evtType == EventType.EVT_NODE_JOINED && e.node.id().equals(locNode.id());
+
+                        fireEvt = locJoin;
+                    }
+                    else
+                        fireEvt = e.topVer > lastEvt.topVer;
+
+                    if (fireEvt) {
+                        assert lastEvt == null || lastEvt.topVer + 1 == e.topVer : "lastEvt="
+ lastEvt + ", nextEvt=" + e;
+
+                        if (!crd) {
+                            if (locJoin) {
+                                for (ZookeeperClusterNode node : e.allNodes) {
+                                    assert node.order() > 0 : node;
+
+                                    Object old = curTop.put(node.order(), node);
+
+                                    assert old == null : node;
+                                }
+                            }
+                            else {
+                                switch (e.evtType) {
+                                    case EventType.EVT_NODE_JOINED: {
+                                        ZookeeperClusterNode node = e.node;
+
+                                        Object old = curTop.put(node.order(), node);
+
+                                        assert old == null : node;
+
+                                        break;
+                                    }
+
+                                    case EventType.EVT_NODE_FAILED: {
+                                        ZookeeperClusterNode node = e.node;
+
+                                        Object failedNode = curTop.remove(node.order());
+
+                                        assert failedNode != null : node;
+
+                                        break;
+                                    }
+
+                                    default:
+                                        assert false : e;
+                                }
+                            }
+                        }
+
+                        log.info("Received discovery event, notify listener: " + e);
+
+                        List<ClusterNode> allNodes = allNodesForEvent(e.allNodes);
+
+                        lsnr.onDiscovery(e.evtType, e.topVer, e.node, allNodes, null, null);
+
+                        if (locJoin) {
+                            log.info("Local node joined: " + e);
+
+                            joinLatch.countDown();
+                        }
+
+                        lastEvt = e;
+                    }
+                }
+
+                curEvts = newEvts;
+            }
+        }
+    }
+
+    private List<ClusterNode> allNodesForEvent(List<ZookeeperClusterNode> allNodes)
{
+        List<ClusterNode> res = new ArrayList<>(allNodes.size());
+
+        for (int i = 0; i < allNodes.size(); i++) {
+            ZookeeperClusterNode node = allNodes.get(i);
+
+            node.local(locNode.id().equals(node.id()));
+
+            res.add(node);
+        }
+
+        return res;
+    }
+
+    /**
+     *
+     */
+    static class ZKDiscoveryEvents implements Serializable {
+        /** */
+        @GridToStringInclude
+        int ver;
+
+        /** */
+        @GridToStringInclude
+        final ZKAliveNodes aliveNodes;
+
+        /** */
+        @GridToStringInclude
+        final TreeMap<Integer, ZKDiscoveryEvent> evts;
+
+        /**
+         * @param aliveNodes
+         * @param evts
+         */
+        ZKDiscoveryEvents(ZKAliveNodes aliveNodes, TreeMap<Integer, ZKDiscoveryEvent>
evts) {
+            this.aliveNodes = aliveNodes;
+            this.evts = evts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ZKDiscoveryEvents.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class ZKDiscoveryEvent implements Serializable {
+        /** */
+        @GridToStringInclude
+        final int evtType;
+
+        /** */
+        @GridToStringInclude
+        final ZookeeperClusterNode node;
+
+        /** */
+        @GridToStringInclude
+        final List<ZookeeperClusterNode> allNodes;
+
+        /** */
+        @GridToStringInclude
+        final int topVer;
+
+        /**
+         * @param evtType
+         * @param topVer
+         * @param node
+         * @param allNodes
+         */
+        ZKDiscoveryEvent(int evtType, int topVer, ZookeeperClusterNode node, List<ZookeeperClusterNode>
allNodes) {
+            this.evtType = evtType;
+            this.topVer = topVer;
+            this.node = node;
+            this.allNodes = allNodes;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ZKDiscoveryEvent.class, this);
+        }
+    }
+
+
+    /**
+     *
+     */
+    private class ZookeeperWatcher implements Watcher {
+        /** {@inheritDoc} */
+        @Override public void process(WatchedEvent event) {
+            log.info("Process event [type=" + event.getType() + ", state=" + event.getState()
+ ", path=" + event.getPath() + ']');
+
+            if (event.getType() == Event.EventType.NodeChildrenChanged) {
+                zk.getChildren(event.getPath(), true, nodesUpdateCallback, null);
+            } else if (event.getType() == Event.EventType.NodeDataChanged) {
+                zk.getData(event.getPath(), true, dataUpdateCallback, null);
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class ZKClusterData implements Serializable {
+        /** */
+        private long gridStartTime;
+
+        /**
+         * @param gridStartTime
+         */
+        public ZKClusterData(long gridStartTime) {
+            this.gridStartTime = gridStartTime;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/48175cf3/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
new file mode 100644
index 0000000..7197947
--- /dev/null
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk;
+
+import java.util.List;
+import org.apache.curator.test.TestingCluster;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
+    /** */
+    private TestingCluster zkCluster;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        assert zkCluster != null;
+
+        ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi();
+
+        zkSpi.setConnectString(zkCluster.getConnectString());
+
+        cfg.setDiscoverySpi(zkSpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        zkCluster = new TestingCluster(1);
+        zkCluster.start();
+
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        if (zkCluster != null) {
+            zkCluster.close();
+
+            zkCluster = null;
+        }
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStop() throws Exception {
+        startGridsMultiThreaded(5, false);
+
+        waitForTopology(5);
+
+        stopGrid(0);
+
+        waitForTopology(4);
+    }
+
+    private void waitForTopology(final int expSize) throws Exception {
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                List<Ignite> nodes = G.allGrids();
+
+                if (nodes.size() != expSize) {
+                    info("Wait all nodes [size=" + nodes.size() + ", exp=" + expSize + ']');
+
+                    return false;
+                }
+
+                for (Ignite node: nodes) {
+                    int sizeOnNode = node.cluster().nodes().size();
+
+                    if (sizeOnNode != expSize) {
+                        info("Wait for size on node [node=" + node.name() + ", size=" + sizeOnNode
+ ", exp=" + expSize + ']');
+
+                        return false;
+                    }
+                }
+
+                return true;
+            }
+        }, 5000));
+    }
+}


Mime
View raw message