ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [28/50] [abbrv] incubator-ignite git commit: # ignite-301
Date Fri, 20 Feb 2015 08:43:45 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
new file mode 100644
index 0000000..9c736cf
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
@@ -0,0 +1,886 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.executor.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.*;
+
+/**
+ *
+ */
+public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Kernal context. */
+    protected transient GridKernalContext ctx;
+
+    /** Parent projection. */
+    private transient ClusterGroup parent;
+
+    /** Compute. */
+    private transient IgniteComputeImpl compute;
+
+    /** Messaging. */
+    private transient IgniteMessagingImpl messaging;
+
+    /** Events. */
+    private transient IgniteEvents evts;
+
+    /** Services. */
+    private transient IgniteServices svcs;
+
+    /** Grid name. */
+    private String gridName;
+
+    /** Subject ID. */
+    private UUID subjId;
+
+    /** Projection predicate. */
+    protected IgnitePredicate<ClusterNode> p;
+
+    /** Node IDs. */
+    private Set<UUID> ids;
+
+    /**
+     * Required by {@link Externalizable}.
+     */
+    public ClusterGroupAdapter() {
+        // No-op.
+    }
+
+    /**
+     * @param subjId Subject ID.
+     * @param parent Parent of this projection.
+     * @param ctx Grid kernal context.
+     * @param p Predicate.
+     */
+    protected ClusterGroupAdapter(@Nullable ClusterGroup parent,
+        @Nullable GridKernalContext ctx,
+        @Nullable UUID subjId,
+        @Nullable IgnitePredicate<ClusterNode> p)
+    {
+        this.parent = parent;
+
+        if (ctx != null)
+            setKernalContext(ctx);
+
+        this.subjId = subjId;
+        this.p = p;
+
+        ids = null;
+    }
+
+    /**
+     * @param parent Parent of this projection.
+     * @param ctx Grid kernal context.
+     * @param subjId Subject ID.
+     * @param ids Node IDs.
+     */
+    protected ClusterGroupAdapter(@Nullable ClusterGroup parent,
+        @Nullable GridKernalContext ctx,
+        @Nullable UUID subjId,
+        Set<UUID> ids)
+    {
+        this.parent = parent;
+
+        if (ctx != null)
+            setKernalContext(ctx);
+
+        assert ids != null;
+
+        this.subjId = subjId;
+        this.ids = ids;
+
+        p = F.nodeForNodeIds(ids);
+    }
+
+    /**
+     * @param subjId Subject ID.
+     * @param parent Parent of this projection.
+     * @param ctx Grid kernal context.
+     * @param p Predicate.
+     * @param ids Node IDs.
+     */
+    private ClusterGroupAdapter(@Nullable ClusterGroup parent,
+        @Nullable GridKernalContext ctx,
+        @Nullable UUID subjId,
+        @Nullable IgnitePredicate<ClusterNode> p,
+        Set<UUID> ids)
+    {
+        this.parent = parent;
+
+        if (ctx != null)
+            setKernalContext(ctx);
+
+        this.subjId = subjId;
+        this.p = p;
+        this.ids = ids;
+
+        if (p == null && ids != null)
+            this.p = F.nodeForNodeIds(ids);
+    }
+
+    /**
+     * <tt>ctx.gateway().readLock()</tt>
+     */
+    protected void guard() {
+        assert ctx != null;
+
+        ctx.gateway().readLock();
+    }
+
+    /**
+     * <tt>ctx.gateway().readUnlock()</tt>
+     */
+    protected void unguard() {
+        assert ctx != null;
+
+        ctx.gateway().readUnlock();
+    }
+
+    /**
+     * Sets kernal context.
+     *
+     * @param ctx Kernal context to set.
+     */
+    public void setKernalContext(GridKernalContext ctx) {
+        assert ctx != null;
+        assert this.ctx == null;
+
+        this.ctx = ctx;
+
+        if (parent == null)
+            parent = ctx.grid().cluster();
+
+        gridName = ctx.gridName();
+    }
+
+    /** {@inheritDoc} */
+    @Override public final Ignite ignite() {
+        assert ctx != null;
+
+        guard();
+
+        try {
+            return ctx.grid();
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /**
+     * @return {@link org.apache.ignite.IgniteCompute} for this projection.
+     */
+    public final IgniteCompute compute() {
+        if (compute == null) {
+            assert ctx != null;
+
+            compute = new IgniteComputeImpl(ctx, this, subjId, false);
+        }
+
+        return compute;
+    }
+
+    /**
+     * @return {@link org.apache.ignite.IgniteMessaging} for this projection.
+     */
+    public final IgniteMessaging message() {
+        if (messaging == null) {
+            assert ctx != null;
+
+            messaging = new IgniteMessagingImpl(ctx, this, false);
+        }
+
+        return messaging;
+    }
+
+    /**
+     * @return {@link org.apache.ignite.IgniteEvents} for this projection.
+     */
+    public final IgniteEvents events() {
+        if (evts == null) {
+            assert ctx != null;
+
+            evts = new IgniteEventsImpl(ctx, this, false);
+        }
+
+        return evts;
+    }
+
+    /**
+     * @return {@link org.apache.ignite.IgniteServices} for this projection.
+     */
+    public IgniteServices services() {
+        if (svcs == null) {
+            assert ctx != null;
+
+            svcs = new IgniteServicesImpl(ctx, this, false);
+        }
+
+        return svcs;
+    }
+
+    /**
+     * @return {@link ExecutorService} for this projection.
+     */
+    public ExecutorService executorService() {
+        assert ctx != null;
+
+        return new GridExecutorService(this, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public final ClusterMetrics metrics() {
+        guard();
+
+        try {
+            if (nodes().isEmpty())
+                throw U.convertException(U.emptyTopologyException());
+
+            return new ClusterMetricsSnapshot(this);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<ClusterNode> nodes() {
+        guard();
+
+        try {
+            if (ids != null) {
+                if (ids.isEmpty())
+                    return Collections.emptyList();
+                else if (ids.size() == 1) {
+                    ClusterNode node = ctx.discovery().node(F.first(ids));
+
+                    return node != null ? Collections.singleton(node) : Collections.<ClusterNode>emptyList();
+                }
+                else {
+                    Collection<ClusterNode> nodes = new ArrayList<>(ids.size());
+
+                    for (UUID id : ids) {
+                        ClusterNode node = ctx.discovery().node(id);
+
+                        if (node != null)
+                            nodes.add(node);
+                    }
+
+                    return nodes;
+                }
+            }
+            else {
+                Collection<ClusterNode> all = ctx.discovery().allNodes();
+
+                return p != null ? F.view(all, p) : all;
+            }
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public final ClusterNode node(UUID id) {
+        A.notNull(id, "id");
+
+        guard();
+
+        try {
+            if (ids != null)
+                return ids.contains(id) ? ctx.discovery().node(id) : null;
+            else {
+                ClusterNode node = ctx.discovery().node(id);
+
+                return node != null && (p == null || p.apply(node)) ? node : null;
+            }
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterNode node() {
+        return F.first(nodes());
+    }
+
+    /** {@inheritDoc} */
+    @Override public final IgnitePredicate<ClusterNode> predicate() {
+        return p != null ? p : F.<ClusterNode>alwaysTrue();
+    }
+
+    /** {@inheritDoc} */
+    @Override public final ClusterGroup forPredicate(IgnitePredicate<ClusterNode> p) {
+        A.notNull(p, "p");
+
+        guard();
+
+        try {
+            return new ClusterGroupAdapter(this, ctx, subjId, this.p != null ? F.and(p, this.p) : p);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public final ClusterGroup forAttribute(String name, @Nullable final String val) {
+        A.notNull(name, "n");
+
+        return forPredicate(new AttributeFilter(name, val));
+    }
+
+    /** {@inheritDoc} */
+    @Override public final ClusterGroup forNode(ClusterNode node, ClusterNode... nodes) {
+        A.notNull(node, "node");
+
+        guard();
+
+        try {
+            Set<UUID> nodeIds;
+
+            if (F.isEmpty(nodes))
+                nodeIds = contains(node) ? Collections.singleton(node.id()) : Collections.<UUID>emptySet();
+            else {
+                nodeIds = U.newHashSet(nodes.length + 1);
+
+                for (ClusterNode n : nodes)
+                    if (contains(n))
+                        nodeIds.add(n.id());
+
+                if (contains(node))
+                    nodeIds.add(node.id());
+            }
+
+            return new ClusterGroupAdapter(this, ctx, subjId, nodeIds);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public final ClusterGroup forNodes(Collection<? extends ClusterNode> nodes) {
+        A.notEmpty(nodes, "nodes");
+
+        guard();
+
+        try {
+            Set<UUID> nodeIds = U.newHashSet(nodes.size());
+
+            for (ClusterNode n : nodes)
+                if (contains(n))
+                    nodeIds.add(n.id());
+
+            return new ClusterGroupAdapter(this, ctx, subjId, nodeIds);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public final ClusterGroup forNodeId(UUID id, UUID... ids) {
+        A.notNull(id, "id");
+
+        guard();
+
+        try {
+            Set<UUID> nodeIds;
+
+            if (F.isEmpty(ids))
+                nodeIds = contains(id) ? Collections.singleton(id) : Collections.<UUID>emptySet();
+            else {
+                nodeIds = U.newHashSet(ids.length + 1);
+
+                for (UUID id0 : ids) {
+                    if (contains(id))
+                        nodeIds.add(id0);
+                }
+
+                if (contains(id))
+                    nodeIds.add(id);
+            }
+
+            return new ClusterGroupAdapter(this, ctx, subjId, nodeIds);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public final ClusterGroup forNodeIds(Collection<UUID> ids) {
+        A.notEmpty(ids, "ids");
+
+        guard();
+
+        try {
+            Set<UUID> nodeIds = U.newHashSet(ids.size());
+
+            for (UUID id : ids) {
+                if (contains(id))
+                    nodeIds.add(id);
+            }
+
+            return new ClusterGroupAdapter(this, ctx, subjId, nodeIds);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public final ClusterGroup forOthers(ClusterNode node, ClusterNode... nodes) {
+        A.notNull(node, "node");
+
+        return forOthers(F.concat(false, node.id(), F.nodeIds(Arrays.asList(nodes))));
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forOthers(ClusterGroup prj) {
+        A.notNull(prj, "prj");
+
+        if (ids != null) {
+            guard();
+
+            try {
+                Set<UUID> nodeIds = U.newHashSet(ids.size());
+
+                for (UUID id : ids) {
+                    ClusterNode n = node(id);
+
+                    if (n != null && !prj.predicate().apply(n))
+                        nodeIds.add(id);
+                }
+
+                return new ClusterGroupAdapter(this, ctx, subjId, nodeIds);
+            }
+            finally {
+                unguard();
+            }
+        }
+        else
+            return forPredicate(F.not(prj.predicate()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public final ClusterGroup forRemotes() {
+        return forOthers(Collections.singleton(ctx.localNodeId()));
+    }
+
+    /**
+     * @param excludeIds Node IDs.
+     * @return New projection.
+     */
+    private ClusterGroup forOthers(Collection<UUID> excludeIds) {
+        assert excludeIds != null;
+
+        if (ids != null) {
+            guard();
+
+            try {
+                Set<UUID> nodeIds = U.newHashSet(ids.size());
+
+                for (UUID id : ids) {
+                    if (!excludeIds.contains(id))
+                        nodeIds.add(id);
+                }
+
+                return new ClusterGroupAdapter(this, ctx, subjId, nodeIds);
+            }
+            finally {
+                unguard();
+            }
+        }
+        else
+            return forPredicate(new OthersFilter(excludeIds));
+    }
+
+    /** {@inheritDoc} */
+    @Override public final ClusterGroup forCacheNodes(@Nullable String cacheName) {
+        return forPredicate(new CachesFilter(cacheName, null));
+    }
+
+    /** {@inheritDoc} */
+    @Override public final ClusterGroup forDataNodes(@Nullable String cacheName) {
+        return forPredicate(new CachesFilter(cacheName, CachesFilter.DATA_MODES));
+    }
+
+    /** {@inheritDoc} */
+    @Override public final ClusterGroup forClientNodes(@Nullable String cacheName) {
+        return forPredicate(new CachesFilter(cacheName, CachesFilter.CLIENT_MODES));
+    }
+
+    /** {@inheritDoc} */
+    @Override public final ClusterGroup forStreamer(@Nullable String streamerName, @Nullable String... streamerNames) {
+        return forPredicate(new StreamersFilter(streamerName, streamerNames));
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forCacheNodes(@Nullable String cacheName,
+        Set<CacheDistributionMode> distributionModes) {
+        return forPredicate(new CachesFilter(cacheName, distributionModes));
+    }
+
+    /** {@inheritDoc} */
+    @Override public final ClusterGroup forHost(ClusterNode node) {
+        A.notNull(node, "node");
+
+        String macs = node.attribute(ATTR_MACS);
+
+        assert macs != null;
+
+        return forAttribute(ATTR_MACS, macs);
+    }
+
+    /** {@inheritDoc} */
+    @Override public final ClusterGroup forDaemons() {
+        return forPredicate(new DaemonFilter());
+    }
+
+    /** {@inheritDoc} */
+    @Override public final ClusterGroup forRandom() {
+        return ids != null ? forNodeId(F.rand(ids)) : forNode(F.rand(nodes()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forOldest() {
+        return new AgeProjection(this, true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forYoungest() {
+        return new AgeProjection(this, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroupEx forSubjectId(UUID subjId) {
+        if (subjId == null)
+            return this;
+
+        guard();
+
+        try {
+            return ids != null ? new ClusterGroupAdapter(this, ctx, subjId, ids) :
+                new ClusterGroupAdapter(this, ctx, subjId, p);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /**
+     * @param n Node.
+     * @return Whether node belongs to this projection.
+     */
+    private boolean contains(ClusterNode n) {
+        assert n != null;
+
+        return ids != null ? ids.contains(n.id()) : p == null || p.apply(n);
+    }
+
+    /**
+     * @param id Node ID.
+     * @return Whether node belongs to this projection.
+     */
+    private boolean contains(UUID id) {
+        assert id != null;
+
+        if (ids != null)
+            return ids.contains(id);
+        else {
+            ClusterNode n = ctx.discovery().node(id);
+
+            return n != null && (p == null || p.apply(n));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeString(out, gridName);
+        U.writeUuid(out, subjId);
+
+        out.writeBoolean(ids != null);
+
+        if (ids != null)
+            out.writeObject(ids);
+        else
+            out.writeObject(p);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        gridName = U.readString(in);
+        subjId = U.readUuid(in);
+
+        if (in.readBoolean())
+            ids = (Set<UUID>)in.readObject();
+        else
+            p = (IgnitePredicate<ClusterNode>)in.readObject();
+    }
+
+    /**
+     * Reconstructs object on unmarshalling.
+     *
+     * @return Reconstructed object.
+     * @throws ObjectStreamException Thrown in case of unmarshalling error.
+     */
+    protected Object readResolve() throws ObjectStreamException {
+        try {
+            IgniteKernal g = IgnitionEx.gridx(gridName);
+
+            ClusterGroup grp = g.cluster();
+
+            return ids != null ? new ClusterGroupAdapter(grp, g.context(), subjId, ids) :
+                p != null ? new ClusterGroupAdapter(grp, g.context(), subjId, p) : g;
+        }
+        catch (IllegalStateException e) {
+            throw U.withCause(new InvalidObjectException(e.getMessage()), e);
+        }
+    }
+
+    /**
+     */
+    private static class CachesFilter implements IgnitePredicate<ClusterNode> {
+        /** */
+        private static final Set<CacheDistributionMode> DATA_MODES = EnumSet.of(CacheDistributionMode.NEAR_PARTITIONED,
+            CacheDistributionMode.PARTITIONED_ONLY);
+
+        /** */
+        private static final Set<CacheDistributionMode> CLIENT_MODES = EnumSet.of(CacheDistributionMode.CLIENT_ONLY,
+            CacheDistributionMode.NEAR_ONLY);
+
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Cache name. */
+        private final String cacheName;
+
+        /** */
+        private final Set<CacheDistributionMode> distributionMode;
+
+        /**
+         * @param cacheName Cache name.
+         * @param distributionMode Filter by {@link org.apache.ignite.configuration.CacheConfiguration#getDistributionMode()}.
+         */
+        private CachesFilter(@Nullable String cacheName, @Nullable Set<CacheDistributionMode> distributionMode) {
+            this.cacheName = cacheName;
+            this.distributionMode = distributionMode;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode n) {
+            GridCacheAttributes[] caches = n.attribute(ATTR_CACHE);
+
+            if (caches != null) {
+                for (GridCacheAttributes attrs : caches) {
+                    if (Objects.equals(cacheName, attrs.cacheName())
+                        && (distributionMode == null || distributionMode.contains(attrs.partitionedTaxonomy())))
+                        return true;
+                }
+            }
+
+            return false;
+        }
+    }
+
+    /**
+     */
+    private static class StreamersFilter implements IgnitePredicate<ClusterNode> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Streamer name. */
+        private final String streamerName;
+
+        /** Streamer names. */
+        private final String[] streamerNames;
+
+        /**
+         * @param streamerName Streamer name.
+         * @param streamerNames Streamer names.
+         */
+        private StreamersFilter(@Nullable String streamerName, @Nullable String[] streamerNames) {
+            this.streamerName = streamerName;
+            this.streamerNames = streamerNames;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode n) {
+            if (!U.hasStreamer(n, streamerName))
+                 return false;
+
+            if (!F.isEmpty(streamerNames))
+                for (String sn : streamerNames)
+                    if (!U.hasStreamer(n, sn))
+                        return false;
+
+            return true;
+        }
+    }
+
+    /**
+     */
+    private static class AttributeFilter implements IgnitePredicate<ClusterNode> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Name. */
+        private final String name;
+
+        /** Value. */
+        private final String val;
+
+        /**
+         * @param name Name.
+         * @param val Value.
+         */
+        private AttributeFilter(String name, String val) {
+            this.name = name;
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode n) {
+            return val == null ? n.attributes().containsKey(name) : val.equals(n.attribute(name));
+        }
+    }
+
+    /**
+     */
+    private static class DaemonFilter implements IgnitePredicate<ClusterNode> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode n) {
+            return n.isDaemon();
+        }
+    }
+
+    /**
+     */
+    private static class OthersFilter implements IgnitePredicate<ClusterNode> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final Collection<UUID> nodeIds;
+
+        /**
+         * @param nodeIds Node IDs.
+         */
+        private OthersFilter(Collection<UUID> nodeIds) {
+            this.nodeIds = nodeIds;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode n) {
+            return !nodeIds.contains(n.id());
+        }
+    }
+
+    /**
+     * Age-based projection.
+     */
+    private static class AgeProjection extends ClusterGroupAdapter {
+        /** Serialization version. */
+        private static final long serialVersionUID = 0L;
+
+        /** Oldest flag. */
+        private boolean isOldest;
+
+        /** Selected node. */
+        private volatile ClusterNode node;
+
+        /** Last topology version. */
+        private volatile long lastTopVer;
+
+        /**
+         * Required for {@link Externalizable}.
+         */
+        public AgeProjection() {
+            // No-op.
+        }
+
+        /**
+         * @param prj Parent projection.
+         * @param isOldest Oldest flag.
+         */
+        private AgeProjection(ClusterGroupAdapter prj, boolean isOldest) {
+            super(prj.parent, prj.ctx, prj.subjId, prj.p, prj.ids);
+
+            this.isOldest = isOldest;
+
+            reset();
+        }
+
+        /**
+         * Resets node.
+         */
+        private synchronized void reset() {
+            guard();
+
+            try {
+                lastTopVer = ctx.discovery().topologyVersion();
+
+                this.node = isOldest ? U.oldest(super.nodes(), null) : U.youngest(super.nodes(), null);
+            }
+            finally {
+                unguard();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClusterNode node() {
+            if (ctx.discovery().topologyVersion() != lastTopVer)
+                reset();
+
+            return node;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<ClusterNode> nodes() {
+            if (ctx.discovery().topologyVersion() != lastTopVer)
+                reset();
+
+            ClusterNode node = this.node;
+
+            return node == null ? Collections.<ClusterNode>emptyList() : Collections.singletonList(node);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupEx.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupEx.java
new file mode 100644
index 0000000..f11b781
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupEx.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Internal projection interface.
+ */
+public interface ClusterGroupEx extends ClusterGroup {
+    /**
+     * Creates projection for specified subject ID.
+     *
+     * @param subjId Subject ID.
+     * @return Internal projection.
+     */
+    public ClusterGroupEx forSubjectId(UUID subjId);
+
+    /**
+     * @param cacheName Cache name.
+     * @param distributionModes Cache distribution modes.
+     * @return Cluster group.
+     */
+    public ClusterGroup forCacheNodes(@Nullable String cacheName, Set<CacheDistributionMode> distributionModes);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterNodeLocalMapImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterNodeLocalMapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterNodeLocalMapImpl.java
new file mode 100644
index 0000000..948cae9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterNodeLocalMapImpl.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.concurrent.*;
+
+/**
+ *
+ */
+public class ClusterNodeLocalMapImpl<K, V> extends ConcurrentHashMap8<K, V> implements ClusterNodeLocalMap<K, V>,
+    Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private static final ThreadLocal<String> stash = new ThreadLocal<>();
+
+    /** */
+    private GridKernalContext ctx;
+
+    /**
+     * No-arg constructor is required by externalization.
+     */
+    public ClusterNodeLocalMapImpl() {
+        // No-op.
+    }
+
+    /**
+     *
+     * @param ctx Kernal context.
+     */
+    ClusterNodeLocalMapImpl(GridKernalContext ctx) {
+        assert ctx != null;
+
+        this.ctx = ctx;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public V addIfAbsent(K key, @Nullable Callable<V> dflt) {
+        return F.addIfAbsent(this, key, dflt);
+    }
+
+    /** {@inheritDoc} */
+    @Override public V addIfAbsent(K key, V val) {
+        return F.addIfAbsent(this, key, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeString(out, ctx.gridName());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        stash.set(U.readString(in));
+    }
+
+    /**
+     * Reconstructs object on unmarshalling.
+     *
+     * @return Reconstructed object.
+     * @throws ObjectStreamException Thrown in case of unmarshalling error.
+     */
+    protected Object readResolve() throws ObjectStreamException {
+        try {
+            return IgnitionEx.gridx(stash.get()).cluster().nodeLocalMap();
+        }
+        catch (IllegalStateException e) {
+            throw U.withCause(new InvalidObjectException(e.getMessage()), e);
+        }
+        finally {
+            stash.remove();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ClusterNodeLocalMapImpl.class, this);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
new file mode 100644
index 0000000..2095e70
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ *
+ */
+public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster> implements IgniteCluster {
+    /** */
+    private final IgniteClusterImpl cluster;
+
+    /**
+     * @param cluster Cluster.
+     */
+    public IgniteClusterAsyncImpl(IgniteClusterImpl cluster) {
+        super(true);
+
+        this.cluster = cluster;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterNode localNode() {
+        return cluster.localNode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forLocal() {
+        return cluster.forLocal();
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> ClusterNodeLocalMap<K, V> nodeLocalMap() {
+        return cluster.nodeLocalMap();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean pingNode(UUID nodeId) {
+        return cluster.pingNode(nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long topologyVersion() {
+        return cluster.topologyVersion();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Collection<ClusterNode> topology(long topVer) {
+        return cluster.topology(topVer);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K> Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable String cacheName,
+        @Nullable Collection<? extends K> keys) {
+        return cluster.mapKeysToNodes(cacheName, keys);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public <K> ClusterNode mapKeyToNode(@Nullable String cacheName, K key) {
+        return cluster.mapKeyToNode(cacheName, key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<GridTuple3<String, Boolean, String>> startNodes(File file,
+        boolean restart,
+        int timeout,
+        int maxConn)
+    {
+        try {
+            return saveOrGet(cluster.startNodesAsync(file, restart, timeout, maxConn));
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<GridTuple3<String, Boolean, String>> startNodes(
+        Collection<Map<String, Object>> hosts,
+        @Nullable Map<String, Object> dflts,
+        boolean restart,
+        int timeout,
+        int maxConn)
+    {
+        try {
+            return saveOrGet(cluster.startNodesAsync(hosts, dflts, restart, timeout, maxConn));
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stopNodes() {
+        cluster.stopNodes();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stopNodes(Collection<UUID> ids) {
+        cluster.stopNodes(ids);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void restartNodes() {
+        cluster.restartNodes();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void restartNodes(Collection<UUID> ids) {
+        cluster.restartNodes(ids);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resetMetrics() {
+        cluster.resetMetrics();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Ignite ignite() {
+        return cluster.ignite();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forNodes(Collection<? extends ClusterNode> nodes) {
+        return cluster.forNodes(nodes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forNode(ClusterNode node, ClusterNode... nodes) {
+        return cluster.forNode(node, nodes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forOthers(ClusterNode node, ClusterNode... nodes) {
+        return cluster.forOthers(node, nodes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forOthers(ClusterGroup prj) {
+        return cluster.forOthers(prj);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forNodeIds(Collection<UUID> ids) {
+        return cluster.forNodeIds(ids);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forNodeId(UUID id, UUID... ids) {
+        return cluster.forNodeId(id, ids);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forPredicate(IgnitePredicate<ClusterNode> p) {
+        return cluster.forPredicate(p);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forAttribute(String name, @Nullable String val) {
+        return cluster.forAttribute(name, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forCacheNodes(String cacheName) {
+        return cluster.forCacheNodes(cacheName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forDataNodes(String cacheName) {
+        return cluster.forDataNodes(cacheName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forClientNodes(String cacheName) {
+        return cluster.forClientNodes(cacheName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forStreamer(String streamerName, @Nullable String... streamerNames) {
+        return cluster.forStreamer(streamerName, streamerNames);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forRemotes() {
+        return cluster.forRemotes();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forHost(ClusterNode node) {
+        return cluster.forHost(node);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forDaemons() {
+        return cluster.forDaemons();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forRandom() {
+        return cluster.forRandom();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forOldest() {
+        return cluster.forOldest();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forYoungest() {
+        return cluster.forYoungest();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<ClusterNode> nodes() {
+        return cluster.nodes();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public ClusterNode node(UUID id) {
+        return cluster.node(id);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public ClusterNode node() {
+        return cluster.node();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgnitePredicate<ClusterNode> predicate() {
+        return cluster.predicate();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterMetrics metrics() {
+        return cluster.metrics();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterEx.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterEx.java
new file mode 100644
index 0000000..4c06031
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterEx.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster;
+
+import org.apache.ignite.*;
+
+/**
+ *
+ */
+public interface IgniteClusterEx extends IgniteCluster, ClusterGroupEx {
+    // No-op.
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
new file mode 100644
index 0000000..95e5b5c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.nodestart.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.*;
+import static org.apache.ignite.internal.util.nodestart.IgniteNodeStartUtils.*;
+
+/**
+ *
+ */
+public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClusterEx, Externalizable {
+    /** */
+    private IgniteConfiguration cfg;
+
+    /** Node local store. */
+    @GridToStringExclude
+    private ClusterNodeLocalMap nodeLoc;
+
+    /** {@inheritDoc} */
+    @Override public void setKernalContext(GridKernalContext ctx) {
+        super.setKernalContext(ctx);
+
+        cfg = ctx.config();
+
+        nodeLoc = new ClusterNodeLocalMapImpl(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forLocal() {
+        guard();
+
+        try {
+            return new ClusterGroupAdapter(this, ctx, null, Collections.singleton(cfg.getNodeId()));
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterNode localNode() {
+        guard();
+
+        try {
+            ClusterNode node = ctx.discovery().localNode();
+
+            assert node != null;
+
+            return node;
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <K, V> ClusterNodeLocalMap<K, V> nodeLocalMap() {
+        guard();
+
+        try {
+            return nodeLoc;
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean pingNode(UUID nodeId) {
+        A.notNull(nodeId, "nodeId");
+
+        guard();
+
+        try {
+            return ctx.discovery().pingNode(nodeId);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public long topologyVersion() {
+        guard();
+
+        try {
+            return ctx.discovery().topologyVersion();
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<ClusterNode> topology(long topVer) throws UnsupportedOperationException {
+        guard();
+
+        try {
+            return ctx.discovery().topology(topVer);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K> Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable String cacheName,
+        @Nullable Collection<? extends K> keys)
+        throws IgniteException
+    {
+        if (F.isEmpty(keys))
+            return Collections.emptyMap();
+
+        guard();
+
+        try {
+            return ctx.affinity().mapKeysToNodes(cacheName, keys);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K> ClusterNode mapKeyToNode(@Nullable String cacheName, K key) throws IgniteException {
+        A.notNull(key, "key");
+
+        guard();
+
+        try {
+            return ctx.affinity().mapKeyToNode(cacheName, key);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<GridTuple3<String, Boolean, String>> startNodes(File file,
+        boolean restart,
+        int timeout,
+        int maxConn)
+        throws IgniteException
+    {
+        try {
+            return startNodesAsync(file, restart, timeout, maxConn).get();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<GridTuple3<String, Boolean, String>> startNodes(Collection<Map<String, Object>> hosts,
+        @Nullable Map<String, Object> dflts,
+        boolean restart,
+        int timeout,
+        int maxConn)
+        throws IgniteException
+    {
+        try {
+            return startNodesAsync(hosts, dflts, restart, timeout, maxConn).get();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stopNodes() throws IgniteException {
+        guard();
+
+        try {
+            compute().execute(IgniteKillTask.class, false);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stopNodes(Collection<UUID> ids) throws IgniteException {
+        guard();
+
+        try {
+            ctx.grid().compute(forNodeIds(ids)).execute(IgniteKillTask.class, false);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void restartNodes() throws IgniteException {
+        guard();
+
+        try {
+            compute().execute(IgniteKillTask.class, true);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void restartNodes(Collection<UUID> ids) throws IgniteException {
+        guard();
+
+        try {
+            ctx.grid().compute(forNodeIds(ids)).execute(IgniteKillTask.class, true);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resetMetrics() {
+        guard();
+
+        try {
+            ctx.jobMetric().reset();
+            ctx.io().resetMetrics();
+            ctx.task().resetMetrics();
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCluster withAsync() {
+        return new IgniteClusterAsyncImpl(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isAsync() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R> IgniteFuture<R> future() {
+        throw new IllegalStateException("Asynchronous mode is not enabled.");
+    }
+
+    /**
+     * @param file Configuration file.
+     * @param restart Whether to stop existing nodes.
+     * @param timeout Connection timeout.
+     * @param maxConn Number of parallel SSH connections to one host.
+     * @return Future with results.
+     * @see {@link IgniteCluster#startNodes(java.io.File, boolean, int, int)}.
+     */
+    IgniteInternalFuture<Collection<GridTuple3<String, Boolean, String>>> startNodesAsync(File file,
+      boolean restart,
+      int timeout,
+      int maxConn)
+    {
+        A.notNull(file, "file");
+        A.ensure(file.exists(), "file doesn't exist.");
+        A.ensure(file.isFile(), "file is a directory.");
+
+        try {
+            IgniteBiTuple<Collection<Map<String, Object>>, Map<String, Object>> t = parseFile(file);
+
+            return startNodesAsync(t.get1(), t.get2(), restart, timeout, maxConn);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(ctx, e);
+        }
+    }
+
+    /**
+     * @param hosts Startup parameters.
+     * @param dflts Default values.
+     * @param restart Whether to stop existing nodes
+     * @param timeout Connection timeout in milliseconds.
+     * @param maxConn Number of parallel SSH connections to one host.
+     * @return Future with results.
+     * @see {@link IgniteCluster#startNodes(java.util.Collection, java.util.Map, boolean, int, int)}.
+     */
+    IgniteInternalFuture<Collection<GridTuple3<String, Boolean, String>>> startNodesAsync(
+        Collection<Map<String, Object>> hosts,
+        @Nullable Map<String, Object> dflts,
+        boolean restart,
+        int timeout,
+        int maxConn)
+    {
+        A.notNull(hosts, "hosts");
+
+        guard();
+
+        try {
+            IgniteSshProcessor sshProcessor = IgniteComponentType.SSH.create(false);
+
+            Map<String, Collection<IgniteRemoteStartSpecification>> specsMap = specifications(hosts, dflts);
+
+            Map<String, ConcurrentLinkedQueue<IgniteNodeCallable>> runMap = new HashMap<>();
+
+            int nodeCallCnt = 0;
+
+            for (String host : specsMap.keySet()) {
+                InetAddress addr;
+
+                try {
+                    addr = InetAddress.getByName(host);
+                }
+                catch (UnknownHostException e) {
+                    throw new IgniteCheckedException("Invalid host name: " + host, e);
+                }
+
+                Collection<? extends ClusterNode> neighbors = null;
+
+                if (addr.isLoopbackAddress())
+                    neighbors = neighbors();
+                else {
+                    for (Collection<ClusterNode> p : U.neighborhood(nodes()).values()) {
+                        ClusterNode node = F.first(p);
+
+                        if (node.<String>attribute(ATTR_IPS).contains(addr.getHostAddress())) {
+                            neighbors = p;
+
+                            break;
+                        }
+                    }
+                }
+
+                int startIdx = 1;
+
+                if (neighbors != null) {
+                    if (restart && !neighbors.isEmpty()) {
+                        try {
+                            ctx.grid().compute(forNodes(neighbors)).execute(IgniteKillTask.class, false);
+                        }
+                        catch (ClusterGroupEmptyException ignored) {
+                            // No-op, nothing to restart.
+                        }
+                    }
+                    else
+                        startIdx = neighbors.size() + 1;
+                }
+
+                ConcurrentLinkedQueue<IgniteNodeCallable> nodeRuns = new ConcurrentLinkedQueue<>();
+
+                runMap.put(host, nodeRuns);
+
+                for (IgniteRemoteStartSpecification spec : specsMap.get(host)) {
+                    assert spec.host().equals(host);
+
+                    for (int i = startIdx; i <= spec.nodes(); i++) {
+                        nodeRuns.add(sshProcessor.nodeStartCallable(spec, timeout));
+
+                        nodeCallCnt++;
+                    }
+                }
+            }
+
+            // If there is nothing to start, return finished future with empty result.
+            if (nodeCallCnt == 0)
+                return new GridFinishedFuture<Collection<GridTuple3<String, Boolean, String>>>(
+                    ctx, Collections.<GridTuple3<String, Boolean, String>>emptyList());
+
+            // Exceeding max line width for readability.
+            GridCompoundFuture<GridTuple3<String, Boolean, String>, Collection<GridTuple3<String, Boolean, String>>>
+                fut = new GridCompoundFuture<>(
+                ctx,
+                CU.<GridTuple3<String, Boolean, String>>objectsReducer()
+            );
+
+            AtomicInteger cnt = new AtomicInteger(nodeCallCnt);
+
+            // Limit maximum simultaneous connection number per host.
+            for (ConcurrentLinkedQueue<IgniteNodeCallable> queue : runMap.values()) {
+                for (int i = 0; i < maxConn; i++) {
+                    if (!runNextNodeCallable(queue, fut, cnt))
+                        break;
+                }
+            }
+
+            return fut;
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(ctx, e);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /**
+     * Gets the all grid nodes that reside on the same physical computer as local grid node.
+     * Local grid node is excluded.
+     * <p>
+     * Detection of the same physical computer is based on comparing set of network interface MACs.
+     * If two nodes have the same set of MACs, Ignite considers these nodes running on the same
+     * physical computer.
+     * @return Grid nodes that reside on the same physical computer as local grid node.
+     */
+    private Collection<ClusterNode> neighbors() {
+        Collection<ClusterNode> neighbors = new ArrayList<>(1);
+
+        String macs = localNode().attribute(ATTR_MACS);
+
+        assert macs != null;
+
+        for (ClusterNode n : forOthers(localNode()).nodes()) {
+            if (macs.equals(n.attribute(ATTR_MACS)))
+                neighbors.add(n);
+        }
+
+        return neighbors;
+    }
+
+    /**
+     * Runs next callable from host node start queue.
+     *
+     * @param queue Queue of tasks to poll from.
+     * @param comp Compound future that comprise all started node tasks.
+     * @param cnt Atomic counter to check if all futures are added to compound future.
+     * @return {@code True} if task was started, {@code false} if queue was empty.
+     */
+    private boolean runNextNodeCallable(final ConcurrentLinkedQueue<IgniteNodeCallable> queue,
+        final GridCompoundFuture<GridTuple3<String, Boolean, String>, Collection<GridTuple3<String, Boolean, String>>>
+        comp,
+        final AtomicInteger cnt)
+    {
+        IgniteNodeCallable call = queue.poll();
+
+        if (call == null)
+            return false;
+
+        IgniteInternalFuture<GridTuple3<String, Boolean, String>> fut = ctx.closure().callLocalSafe(call, true);
+
+        comp.add(fut);
+
+        if (cnt.decrementAndGet() == 0)
+            comp.markInitialized();
+
+        fut.listenAsync(new CI1<IgniteInternalFuture<GridTuple3<String, Boolean, String>>>() {
+            @Override public void apply(IgniteInternalFuture<GridTuple3<String, Boolean, String>> f) {
+                runNextNodeCallable(queue, comp, cnt);
+            }
+        });
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        ctx = (GridKernalContext)in.readObject();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Object readResolve() throws ObjectStreamException {
+        return ctx.grid().cluster();
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+        return "IgniteCluster [igniteName=" + ctx.gridName() + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteKillTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteKillTask.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteKillTask.java
new file mode 100644
index 0000000..bd81bc3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteKillTask.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.processors.task.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.*;
+
+/**
+ * Special kill task that never fails over jobs.
+ */
+@GridInternal
+class IgniteKillTask extends ComputeTaskAdapter<Boolean, Void> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Restart flag. */
+    private boolean restart;
+
+    /** {@inheritDoc} */
+    @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Boolean restart) {
+        assert restart != null;
+
+        this.restart = restart;
+
+        Map<ComputeJob, ClusterNode> jobs = U.newHashMap(subgrid.size());
+
+        for (ClusterNode n : subgrid)
+            if (!daemon(n))
+                jobs.put(new IgniteKillJob(), n);
+
+        return jobs;
+    }
+
+    /**
+     * Checks if given node is a daemon node.
+     *
+     * @param n Node.
+     * @return Whether node is daemon.
+     */
+    private boolean daemon(ClusterNode n) {
+        return "true".equalsIgnoreCase(n.<String>attribute(ATTR_DAEMON));
+    }
+
+    /** {@inheritDoc} */
+    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+        return ComputeJobResultPolicy.WAIT;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Void reduce(List<ComputeJobResult> results) {
+        return null;
+    }
+
+    /**
+     * Kill job.
+     */
+    private class IgniteKillJob extends ComputeJobAdapter {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public Object execute() {
+            if (restart)
+                new Thread(new Runnable() {
+                    @Override public void run() {
+                        G.restart(true);
+                    }
+                },
+                "ignite-restarter").start();
+            else
+                new Thread(new Runnable() {
+                    @Override public void run() {
+                        G.kill(true);
+                    }
+                },
+                "ignite-stopper").start();
+
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java b/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java
index 96793af..1d0ef09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.executor;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.compute.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 808c62f..18dad2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -194,7 +194,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
 
     /** {@inheritDoc} */
     @Override public ClusterGroup gridProjection() {
-        return ctx.grid().forCacheNodes(name());
+        return ctx.grid().cluster().forCacheNodes(name());
     }
 
     /**
@@ -1541,7 +1541,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
     @Override public void clear(long timeout) throws IgniteCheckedException {
         try {
             // Send job to remote nodes only.
-            Collection<ClusterNode> nodes = ctx.grid().forCacheNodes(name()).forRemotes().nodes();
+            Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name()).forRemotes().nodes();
 
             IgniteInternalFuture<Object> fut = null;
 
@@ -1571,7 +1571,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> clearAsync() {
-        Collection<ClusterNode> nodes = ctx.grid().forCacheNodes(name()).nodes();
+        Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name()).nodes();
 
         if (!nodes.isEmpty()) {
             IgniteInternalFuture<Object> fut =
@@ -4012,7 +4012,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
 
         if (replaceExisting) {
             if (ctx.store().isLocalStore()) {
-                Collection<ClusterNode> nodes = ctx.grid().forDataNodes(name()).nodes();
+                Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
 
                 if (nodes.isEmpty())
                     return new GridFinishedFuture<>(ctx.kernalContext());
@@ -4033,7 +4033,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
             }
         }
         else {
-            Collection<ClusterNode> nodes = ctx.grid().forDataNodes(name()).nodes();
+            Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
 
             if (nodes.isEmpty())
                 return new GridFinishedFuture<>(ctx.kernalContext());
@@ -4182,7 +4182,9 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
 
         PeekModes modes = parsePeekModes(peekModes);
 
-        ClusterGroup grp = modes.near ? ctx.grid().forCacheNodes(name(), SIZE_NODES) : ctx.grid().forDataNodes(name());
+        IgniteClusterEx cluster = ctx.grid().cluster();
+
+        ClusterGroup grp = modes.near ? cluster.forCacheNodes(name(), SIZE_NODES) : cluster.forDataNodes(name());
 
         Collection<ClusterNode> nodes = grp.nodes();
 
@@ -4511,7 +4513,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
     private int globalSize(boolean primaryOnly) throws IgniteCheckedException {
         try {
             // Send job to remote nodes only.
-            Collection<ClusterNode> nodes = ctx.grid().forCacheNodes(name()).forRemotes().nodes();
+            Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name()).forRemotes().nodes();
 
             IgniteInternalFuture<Collection<Integer>> fut = null;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 9489e36..19bc444 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -346,7 +346,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
                 qry.getBufferSize(),
                 qry.getTimeInterval(),
                 qry.isAutoUnsubscribe(),
-                loc ? ctx.grid().forLocal() : null);
+                loc ? ctx.grid().cluster().forLocal() : null);
 
             final QueryCursor<Cache.Entry<K, V>> cur;
 
@@ -383,11 +383,11 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /**
-     * @param local Enforce local.
+     * @param loc Enforce local.
      * @return Local node cluster group.
      */
-    private ClusterGroup projection(boolean local) {
-        return local ? ctx.kernalContext().grid().forLocal() : null;
+    private ClusterGroup projection(boolean loc) {
+        return loc ? ctx.kernalContext().grid().cluster().forLocal() : null;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 8c73cee..dc82e83 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -143,7 +143,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
                 topVer = ctx.affinity().affinityTopologyVersion();
 
                 // Send job to all data nodes.
-                Collection<ClusterNode> nodes = ctx.grid().forDataNodes(name()).nodes();
+                Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
 
                 if (!nodes.isEmpty()) {
                     ctx.closures().callAsyncNoFailover(BROADCAST,
@@ -174,7 +174,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
      * @param topVer Topology version.
      */
     private void removeAllAsync(final GridFutureAdapter<Void> opFut, final long topVer) {
-        Collection<ClusterNode> nodes = ctx.grid().forDataNodes(name()).nodes();
+        Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
 
         if (!nodes.isEmpty()) {
             IgniteInternalFuture<?> rmvFut = ctx.closures().callAsyncNoFailover(BROADCAST,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 67c1208..362077f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1299,7 +1299,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     if (log.isDebugEnabled())
                         log.debug("Record [key=" + key + ", val=" + val + ", incBackups=" +
                             incBackups + "priNode=" + U.id8(CU.primaryNode(cctx, key).id()) +
-                            ", node=" + U.id8(cctx.grid().localNode().id()) + ']');
+                            ", node=" + U.id8(cctx.localNode().id()) + ']');
 
                     if (val == null) {
                         if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 8480211..0643c0e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -278,7 +278,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
             true,
             false,
             true,
-            loc ? cctx.grid().forLocal() : null);
+            loc ? cctx.grid().cluster().forLocal() : null);
     }
 
     public void cancelInternalQuery(UUID routineId) {
@@ -347,7 +347,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
         cctx.checkSecurity(GridSecurityPermission.CACHE_READ);
 
         if (grp == null)
-            grp = cctx.kernalContext().grid();
+            grp = cctx.kernalContext().grid().cluster();
 
         Collection<ClusterNode> nodes = grp.nodes();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
index 10ccfd0..ced8d1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
@@ -178,7 +178,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay
 
         log = U.logger(ctx, logRef, IgniteDataLoaderImpl.class);
 
-        ClusterNode node = F.first(ctx.grid().forCacheNodes(cacheName).nodes());
+        ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
 
         if (node == null)
             throw new IllegalStateException("Cache doesn't exist: " + cacheName);
@@ -294,7 +294,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay
         if (allow == allowOverwrite())
             return;
 
-        ClusterNode node = F.first(ctx.grid().forCacheNodes(cacheName).nodes());
+        ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
 
         if (node == null)
             throw new IgniteException("Failed to get node for cache: " + cacheName);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index 7e68ca4..5d96086 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -118,7 +118,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
 
             Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion());
 
-            qry.projection(ctx.grid().forNodes(nodes));
+            qry.projection(ctx.grid().cluster().forNodes(nodes));
 
             Iterable<Integer> col = (Iterable<Integer>)qry.execute(new SumReducer()).get();
 
@@ -349,7 +349,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
 
             Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion());
 
-            qry.projection(ctx.grid().forNodes(nodes));
+            qry.projection(ctx.grid().cluster().forNodes(nodes));
 
             CacheQueryFuture<Map.Entry<T, ?>> fut = qry.execute();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
index 64ab391..983dd55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
@@ -347,7 +347,7 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
                 chain(resultWrapper((CacheProjection<Object, Object>)prj, key));
         }
         else {
-            ClusterGroup prj = ctx.grid().forPredicate(F.nodeForNodeId(destId));
+            ClusterGroup prj = ctx.grid().cluster().forPredicate(F.nodeForNodeId(destId));
 
             ctx.task().setThreadContext(TC_NO_FAILOVER, true);
 
@@ -385,7 +385,7 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
             return op.apply(cache, ctx).chain(resultWrapper(cache, key));
         }
         else {
-            ClusterGroup prj = ctx.grid().forPredicate(F.nodeForNodeId(destId));
+            ClusterGroup prj = ctx.grid().cluster().forPredicate(F.nodeForNodeId(destId));
 
             ctx.task().setThreadContext(TC_NO_FAILOVER, true);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheQueryCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheQueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheQueryCommandHandler.java
index bcf6935..93e3363 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheQueryCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheQueryCommandHandler.java
@@ -130,7 +130,7 @@ public class GridCacheQueryCommandHandler extends GridRestCommandHandlerAdapter
 
             ctx.task().setThreadContext(TC_NO_FAILOVER, true);
 
-            return ctx.closure().callAsync(BALANCE, c, ctx.grid().forNodeId(destId).nodes());
+            return ctx.closure().callAsync(BALANCE, c, ctx.grid().cluster().forNodeId(destId).nodes());
         }
     }
 
@@ -144,7 +144,7 @@ public class GridCacheQueryCommandHandler extends GridRestCommandHandlerAdapter
 
         IgniteInternalFuture<Collection<Object>> fut = ctx.closure().callAsync(BROADCAST,
             Arrays.asList(c),
-            ctx.grid().forCacheNodes(cacheName).nodes());
+            ctx.grid().cluster().forCacheNodes(cacheName).nodes());
 
         return fut.chain(new C1<IgniteInternalFuture<Collection<Object>>, GridRestResponse>() {
             @Override public GridRestResponse apply(IgniteInternalFuture<Collection<Object>> fut) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
index f9004c3..78b6bd1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
@@ -199,7 +199,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
                 else {
                     // Using predicate instead of node intentionally
                     // in order to provide user well-structured EmptyProjectionException.
-                    ClusterGroup prj = ctx.grid().forPredicate(F.nodeForNodeId(req.destinationId()));
+                    ClusterGroup prj = ctx.grid().cluster().forPredicate(F.nodeForNodeId(req.destinationId()));
 
                     ctx.task().setThreadContext(TC_NO_FAILOVER, true);
 
@@ -612,7 +612,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
 
         /** {@inheritDoc} */
         @Override public Object call() throws Exception {
-            return g.compute(g.forSubjectId(clientId)).execute(
+            return g.compute(g.cluster().forSubjectId(clientId)).execute(
                 name,
                 !params.isEmpty() ? params.size() == 1 ? params.get(0) : params.toArray() : null);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java
index a986d5e..e32f6f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java
@@ -104,7 +104,7 @@ public class GridTopologyCommandHandler extends GridRestCommandHandlerAdapter {
                     // Always refresh topology so client see most up-to-date view.
                     ctx.discovery().alive(id);
 
-                    node = ctx.grid().node(id);
+                    node = ctx.grid().cluster().node(id);
 
                     if (ip != null && node != null && !containsIp(node.addresses(), ip))
                         node = null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextImpl.java
index 7d112eb..c2a992d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextImpl.java
@@ -189,7 +189,7 @@ public class GridStreamerContextImpl implements StreamerContext {
         ClusterGroup prj = streamPrj.get();
 
         if (prj == null) {
-            prj = ctx.grid().forStreamer(streamer.name());
+            prj = ctx.grid().cluster().forStreamer(streamer.name());
 
             streamPrj.compareAndSet(null, prj);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
index a49d568..f425d75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
@@ -82,10 +82,11 @@ public class VisorCacheClearTask extends VisorOneNodeTask<String, IgniteBiTuple<
 
         /**
          * @param subJob Sub job to execute asynchronously.
+         * @param idx Index.
          * @return {@code true} If subJob was not completed and this job should be suspended.
          */
         private boolean callAsync(IgniteCallable<Integer> subJob, int idx) {
-            IgniteCompute compute = ignite.compute(ignite.forCacheNodes(cacheName)).withAsync();
+            IgniteCompute compute = ignite.compute(ignite.cluster().forCacheNodes(cacheName)).withAsync();
 
             compute.call(subJob);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeCancelSessionsTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeCancelSessionsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeCancelSessionsTask.java
index 02fd4f9..bf828e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeCancelSessionsTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeCancelSessionsTask.java
@@ -66,7 +66,7 @@ public class VisorComputeCancelSessionsTask extends VisorMultiNodeTask<Map<UUID,
             Set<IgniteUuid> sesIds = arg.get(ignite.localNode().id());
 
             if (sesIds != null && !sesIds.isEmpty()) {
-                IgniteCompute compute = ignite.compute(ignite.forLocal());
+                IgniteCompute compute = ignite.compute(ignite.cluster().forLocal());
 
                 Map<IgniteUuid, ComputeTaskFuture<Object>> futs = compute.activeTaskFutures();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeResetMetricsTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeResetMetricsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeResetMetricsTask.java
index c495964..6b6cc61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeResetMetricsTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeResetMetricsTask.java
@@ -51,7 +51,7 @@ public class VisorComputeResetMetricsTask extends VisorOneNodeTask<Void, Void> {
 
         /** {@inheritDoc} */
         @Override protected Void run(Void arg) {
-            ignite.resetMetrics();
+            ignite.cluster().resetMetrics();
 
             return null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeToggleMonitoringTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeToggleMonitoringTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeToggleMonitoringTask.java
index 12f42d3..b857eee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeToggleMonitoringTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeToggleMonitoringTask.java
@@ -75,7 +75,7 @@ public class VisorComputeToggleMonitoringTask extends
             if (checkExplicitTaskMonitoring(ignite))
                 return true;
             else {
-                ClusterNodeLocalMap<String, VisorComputeMonitoringHolder> storage = ignite.nodeLocalMap();
+                ClusterNodeLocalMap<String, VisorComputeMonitoringHolder> storage = ignite.cluster().nodeLocalMap();
 
                 VisorComputeMonitoringHolder holder = storage.get(COMPUTE_MONITORING_HOLDER_KEY);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
index c32a23b..e576704 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
@@ -84,7 +84,7 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa
                 res.taskMonitoringEnabled(arg.taskMonitoringEnabled());
 
                 if (arg.taskMonitoringEnabled()) {
-                    ClusterNodeLocalMap<String, VisorComputeMonitoringHolder> storage = ignite.nodeLocalMap();
+                    ClusterNodeLocalMap<String, VisorComputeMonitoringHolder> storage = ignite.cluster().nodeLocalMap();
 
                     VisorComputeMonitoringHolder holder = storage.get(COMPUTE_MONITORING_HOLDER_KEY);
 
@@ -221,7 +221,7 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa
         VisorNodeDataCollectorTaskArg arg) {
         res.gridName(ignite.name());
 
-        res.topologyVersion(ignite.topologyVersion());
+        res.topologyVersion(ignite.cluster().topologyVersion());
 
         long start0 = U.currentTimeMillis();
 


Mime
View raw message