ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anovi...@apache.org
Subject [4/6] ignite git commit: Merge master into ignite-843
Date Mon, 12 Oct 2015 10:32:15 GMT
Merge master into ignite-843


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ae09fa9f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ae09fa9f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ae09fa9f

Branch: refs/heads/ignite-843
Commit: ae09fa9f26b9171636f22cdc11c61a77ad3cd4f4
Parents: 49514e8 1223525
Author: Andrey <anovikov@gridgain.com>
Authored: Mon Oct 12 16:28:55 2015 +0700
Committer: Andrey <anovikov@gridgain.com>
Committed: Mon Oct 12 16:29:06 2015 +0700

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         | 479 ++++++------
 .../processors/cache/GridCacheProcessor.java    | 157 ++--
 .../cache/query/GridCacheQueryManager.java      | 719 +++++++++----------
 .../processors/rest/GridRestProcessor.java      | 176 +++--
 .../handlers/cache/GridCacheCommandHandler.java | 313 ++++----
 .../handlers/query/QueryCommandHandler.java     | 156 ++--
 .../top/GridTopologyCommandHandler.java         |  18 +-
 .../rest/request/RestQueryRequest.java          |  26 +-
 .../http/jetty/GridJettyRestHandler.java        | 151 ++--
 9 files changed, 1055 insertions(+), 1140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ae09fa9f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 6aba211,9e54f6f..b74fbdb
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@@ -134,33 -134,33 +134,24 @@@ import static org.apache.ignite.plugin.
   * Discovery SPI manager.
   */
  public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
++    /** Discovery cached history size. */
++    protected static final int DISCOVERY_HISTORY_SIZE = 100;
      /** Fake key for {@code null}-named caches. Used inside {@link DiscoCache}. */
      private static final String NULL_CACHE_NAME = UUID.randomUUID().toString();
--
      /** Metrics update frequency. */
      private static final long METRICS_UPDATE_FREQ = 3000;
--
      /** */
      private static final MemoryMXBean mem = ManagementFactory.getMemoryMXBean();
--
      /** */
      private static final OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
--
      /** */
      private static final RuntimeMXBean rt = ManagementFactory.getRuntimeMXBean();
--
      /** */
      private static final ThreadMXBean threads = ManagementFactory.getThreadMXBean();
--
      /** */
      private static final Collection<GarbageCollectorMXBean> gc = ManagementFactory.getGarbageCollectorMXBeans();
--
      /** */
      private static final String PREFIX = "Topology snapshot";
--
--    /** Discovery cached history size. */
--    protected static final int DISCOVERY_HISTORY_SIZE = 100;
--
      /** Predicate filtering out daemon nodes. */
      private static final IgnitePredicate<ClusterNode> daemonFilter = new P1<ClusterNode>() {
          @Override public boolean apply(ClusterNode n) {
@@@ -185,88 -185,88 +176,83 @@@
  
      /** Discovery event worker. */
      private final DiscoveryWorker discoWrk = new DiscoveryWorker();
--
++    /** Last logged topology. */
++    private final AtomicLong lastLoggedTop = new AtomicLong();
++    /** Last segment check result. */
++    private final AtomicBoolean lastSegChkRes = new AtomicBoolean(true);
++    /** Topology cache history. */
++    private final Map<AffinityTopologyVersion, DiscoCache> discoCacheHist =
++        new GridBoundedConcurrentOrderedMap<>(DISCOVERY_HISTORY_SIZE);
++    /** Topology version. */
++    private final AtomicReference<Snapshot> topSnap =
++        new AtomicReference<>(new Snapshot(AffinityTopologyVersion.ZERO, null));
++    /** */
++    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
++    /** Received custom messages history. */
++    private final ArrayDeque<IgniteUuid> rcvdCustomMsgs = new ArrayDeque<>();
++    /** */
++    private final CountDownLatch startLatch = new CountDownLatch(1);
      /** Network segment check worker. */
      private SegmentCheckWorker segChkWrk;
--
      /** Network segment check thread. */
      private IgniteThread segChkThread;
--
--    /** Last logged topology. */
--    private final AtomicLong lastLoggedTop = new AtomicLong();
--
      /** Local node. */
      private ClusterNode locNode;
--
      /** Local node daemon flag. */
      private boolean isLocDaemon;
--
      /** {@code True} if resolvers were configured and network segment check is enabled. */
      private boolean hasRslvrs;
--
--    /** Last segment check result. */
--    private final AtomicBoolean lastSegChkRes = new AtomicBoolean(true);
--
--    /** Topology cache history. */
--    private final Map<AffinityTopologyVersion, DiscoCache> discoCacheHist =
--        new GridBoundedConcurrentOrderedMap<>(DISCOVERY_HISTORY_SIZE);
--
      /** Topology snapshots history. */
      private volatile Map<Long, Collection<ClusterNode>> topHist = new HashMap<>();
--
--    /** Topology version. */
--    private final AtomicReference<Snapshot> topSnap =
--        new AtomicReference<>(new Snapshot(AffinityTopologyVersion.ZERO, null));
--
      /** Minor topology version. */
      private int minorTopVer;
--
      /** Order supported flag. */
      private boolean discoOrdered;
--
      /** Topology snapshots history supported flag. */
      private boolean histSupported;
--
      /** Configured network segment check frequency. */
      private long segChkFreq;
--
      /** Local node join to topology event. */
      private GridFutureAdapter<DiscoveryEvent> locJoinEvt = new GridFutureAdapter<>();
--
      /** GC CPU load. */
      private volatile double gcCpuLoad;
--
      /** CPU load. */
      private volatile double cpuLoad;
--
      /** Metrics. */
      private final GridLocalMetrics metrics = createMetrics();
--
      /** Metrics update worker. */
      private GridTimeoutProcessor.CancelableTask metricsUpdateTask;
--
      /** Custom event listener. */
      private ConcurrentMap<Class<?>, List<CustomEventListener<DiscoveryCustomMessage>>> customEvtLsnrs =
          new ConcurrentHashMap8<>();
--
      /** Map of dynamic cache filters. */
      private Map<String, CachePredicate> registeredCaches = new HashMap<>();
  
--    /** */
--    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
--
--    /** Received custom messages history. */
--    private final ArrayDeque<IgniteUuid> rcvdCustomMsgs = new ArrayDeque<>();
--
--    /** */
--    private final CountDownLatch startLatch = new CountDownLatch(1);
--
      /** @param ctx Context. */
      public GridDiscoveryManager(GridKernalContext ctx) {
          super(ctx, ctx.config().getDiscoverySpi());
      }
  
      /**
++     * @param nodes Nodes.
++     * @return Total CPUs.
++     */
++    private static int cpus(Collection<ClusterNode> nodes) {
++        Collection<String> macSet = new HashSet<>(nodes.size(), 1.0f);
++
++        int cpus = 0;
++
++        for (ClusterNode n : nodes) {
++            String macs = n.attribute(ATTR_MACS);
++
++            if (macSet.add(macs))
++                cpus += n.metrics().getTotalCpus();
++        }
++
++        return cpus;
++    }
++
++    /**
       * @return Memory usage of non-heap memory.
       */
      private MemoryUsage nonHeapMemoryUsage() {
@@@ -1063,25 -1063,25 +1049,6 @@@
      }
  
      /**
--     * @param nodes Nodes.
--     * @return Total CPUs.
--     */
--    private static int cpus(Collection<ClusterNode> nodes) {
--        Collection<String> macSet = new HashSet<>(nodes.size(), 1.0f);
--
--        int cpus = 0;
--
--        for (ClusterNode n : nodes) {
--            String macs = n.attribute(ATTR_MACS);
--
--            if (macSet.add(macs))
--                cpus += n.metrics().getTotalCpus();
--        }
--
--        return cpus;
--    }
--
--    /**
       * Prints the latest topology info into log taking into account logging/verbosity settings.
       */
      public void ackTopology() {
@@@ -1812,105 -1812,105 +1779,291 @@@
          ).start();
      }
  
 -    /** Worker for network segment checks. */
 -    private class SegmentCheckWorker extends GridWorker {
++    /** Discovery topology future. */
++    private static class DiscoTopologyFuture extends GridFutureAdapter<Long> implements GridLocalEventListener {
+         /** */
 -        private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
++        private static final long serialVersionUID = 0L;
+ 
 -        /**
 -         *
 -         */
 -        private SegmentCheckWorker() {
 -            super(ctx.gridName(), "disco-net-seg-chk-worker", GridDiscoveryManager.this.log);
++        /** */
++        private GridKernalContext ctx;
+ 
 -            assert hasRslvrs;
 -            assert segChkFreq > 0;
++        /** Topology await version. */
++        private long awaitVer;
++
++        /** Empty constructor required by {@link Externalizable}. */
++        private DiscoTopologyFuture() {
++            // No-op.
+         }
+ 
+         /**
 -         *
++         * @param ctx Context.
++         * @param awaitVer Await version.
+          */
 -        public void scheduleSegmentCheck() {
 -            queue.add(new Object());
++        private DiscoTopologyFuture(GridKernalContext ctx, long awaitVer) {
++            this.ctx = ctx;
++            this.awaitVer = awaitVer;
+         }
+ 
 -        /** {@inheritDoc} */
 -        @SuppressWarnings("StatementWithEmptyBody")
 -        @Override protected void body() throws InterruptedException {
 -            long lastChk = 0;
++        /** Initializes future. */
++        private void init() {
++            ctx.event().addLocalEventListener(this, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
+ 
 -            while (!isCancelled()) {
 -                Object req = queue.poll(2000, MILLISECONDS);
++            // Close potential window.
++            long topVer = ctx.discovery().topologyVersion();
+ 
 -                long now = U.currentTimeMillis();
++            if (topVer >= awaitVer)
++                onDone(topVer);
++        }
+ 
 -                // Check frequency if segment check has not been requested.
 -                if (req == null && (segChkFreq == 0 || lastChk + segChkFreq >= now)) {
 -                    if (log.isDebugEnabled())
 -                        log.debug("Skipping segment check as it has not been requested and it is not time to check.");
++        /** {@inheritDoc} */
++        @Override public boolean onDone(@Nullable Long res, @Nullable Throwable err) {
++            if (super.onDone(res, err)) {
++                ctx.event().removeLocalEventListener(this, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
+ 
 -                    continue;
 -                }
++                return true;
++            }
+ 
 -                // We should always check segment if it has been explicitly
 -                // requested (on any node failure or leave).
 -                assert req != null || lastChk + segChkFreq < now;
++            return false;
++        }
+ 
 -                // Drain queue.
 -                while (queue.poll() != null) {
 -                    // No-op.
 -                }
++        /** {@inheritDoc} */
++        @Override public void onEvent(Event evt) {
++            assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
+ 
 -                if (lastSegChkRes.get()) {
 -                    boolean segValid = ctx.segmentation().isValidSegment();
++            DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+ 
 -                    lastChk = now;
++            if (discoEvt.topologyVersion() >= awaitVer)
++                onDone(discoEvt.topologyVersion());
++        }
++    }
+ 
 -                    if (!segValid) {
 -                        discoWrk.addEvent(EVT_NODE_SEGMENTED, AffinityTopologyVersion.NONE, getSpi().getLocalNode(),
 -                            Collections.<ClusterNode>emptyList(), null);
++    /**
++     *
++     */
++    private static class Snapshot {
++        /** */
++        private final AffinityTopologyVersion topVer;
+ 
 -                        lastSegChkRes.set(false);
 -                    }
++        /** */
++        @GridToStringExclude
++        private final DiscoCache discoCache;
+ 
 -                    if (log.isDebugEnabled())
 -                        log.debug("Segment has been checked [requested=" + (req != null) + ", valid=" + segValid + ']');
 -                }
 -            }
++        /**
++         * @param topVer Topology version.
++         * @param discoCache Disco cache.
++         */
++        private Snapshot(AffinityTopologyVersion topVer, DiscoCache discoCache) {
++            this.topVer = topVer;
++            this.discoCache = discoCache;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public String toString() {
 -            return S.toString(SegmentCheckWorker.class, this);
++            return S.toString(Snapshot.class, this);
+         }
+     }
+ 
 -    /** Worker for discovery events. */
 -    private class DiscoveryWorker extends GridWorker {
 -        /** Event queue. */
 -        private final BlockingQueue<GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>,
 -            DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>();
++    /**
++     * Cache predicate.
++     */
++    private static class CachePredicate {
++        /** Cache filter. */
++        private final IgnitePredicate<ClusterNode> cacheFilter;
+ 
 -        /** Node segmented event fired flag. */
 -        private boolean nodeSegFired;
++        /** If near cache is enabled on data nodes. */
++        private final boolean nearEnabled;
++
++        /** Cache mode. */
++        private final CacheMode cacheMode;
++
++        /** Collection of client near nodes. */
++        private final ConcurrentHashMap<UUID, Boolean> clientNodes;
+ 
+         /**
 -         *
++         * @param cacheFilter Cache filter.
++         * @param nearEnabled Near enabled flag.
++         * @param cacheMode Cache mode.
+          */
 -        private DiscoveryWorker() {
 -            super(ctx.gridName(), "disco-event-worker", GridDiscoveryManager.this.log);
++        private CachePredicate(IgnitePredicate<ClusterNode> cacheFilter, boolean nearEnabled, CacheMode cacheMode) {
++            assert cacheFilter != null;
++
++            this.cacheFilter = cacheFilter;
++            this.nearEnabled = nearEnabled;
++            this.cacheMode = cacheMode;
++
++            clientNodes = new ConcurrentHashMap<>();
+         }
+ 
+         /**
 -         * Method is called when any discovery event occurs.
 -         *
 -         * @param type Discovery event type. See {@link DiscoveryEvent} for more details.
 -         * @param topVer Topology version.
 -         * @param node Remote node this event is connected with.
 -         * @param topSnapshot Topology snapshot.
++         * @param nodeId Near node ID to add.
++         * @param nearEnabled Near enabled flag.
++         * @return {@code True} if new node ID was added.
+          */
 -        @SuppressWarnings("RedundantTypeArguments")
 -        private void recordEvent(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot) {
++        public boolean addClientNode(UUID nodeId, boolean nearEnabled) {
++            assert nodeId != null;
++
++            Boolean old = clientNodes.putIfAbsent(nodeId, nearEnabled);
++
++            return old == null;
++        }
++
++        /**
++         * @param leftNodeId Left node ID.
++         * @return {@code True} if existing node ID was removed.
++         */
++        public boolean onNodeLeft(UUID leftNodeId) {
++            assert leftNodeId != null;
++
++            Boolean old = clientNodes.remove(leftNodeId);
++
++            return old != null;
++        }
++
++        /**
++         * @param node Node to check.
++         * @return {@code True} if this node is a data node for given cache.
++         */
++        public boolean dataNode(ClusterNode node) {
++            return !node.isDaemon() && CU.affinityNode(node, cacheFilter);
++        }
++
++        /**
++         * @param node Node to check.
++         * @return {@code True} if cache is accessible on the given node.
++         */
++        public boolean cacheNode(ClusterNode node) {
++            return !node.isDaemon() && (CU.affinityNode(node, cacheFilter) || clientNodes.containsKey(node.id()));
++        }
++
++        /**
++         * @param node Node to check.
++         * @return {@code True} if near cache is present on the given nodes.
++         */
++        public boolean nearNode(ClusterNode node) {
++            if (node.isDaemon())
++                return false;
++
++            if (CU.affinityNode(node, cacheFilter))
++                return nearEnabled;
++
++            Boolean near = clientNodes.get(node.id());
++
++            return near != null && near;
++        }
++
++        /**
++         * @param node Node to check.
++         * @return {@code True} if client cache is present on the given nodes.
++         */
++        public boolean clientNode(ClusterNode node) {
++            if (node.isDaemon())
++                return false;
++
++            Boolean near = clientNodes.get(node.id());
++
++            return near != null && !near;
++        }
++    }
++
 +    /** Worker for network segment checks. */
 +    private class SegmentCheckWorker extends GridWorker {
 +        /** */
 +        private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
 +
 +        /**
 +         *
 +         */
 +        private SegmentCheckWorker() {
 +            super(ctx.gridName(), "disco-net-seg-chk-worker", GridDiscoveryManager.this.log);
 +
 +            assert hasRslvrs;
 +            assert segChkFreq > 0;
 +        }
 +
 +        /**
 +         *
 +         */
 +        public void scheduleSegmentCheck() {
 +            queue.add(new Object());
 +        }
 +
 +        /** {@inheritDoc} */
 +        @SuppressWarnings("StatementWithEmptyBody")
 +        @Override protected void body() throws InterruptedException {
 +            long lastChk = 0;
 +
 +            while (!isCancelled()) {
 +                Object req = queue.poll(2000, MILLISECONDS);
 +
 +                long now = U.currentTimeMillis();
 +
 +                // Check frequency if segment check has not been requested.
 +                if (req == null && (segChkFreq == 0 || lastChk + segChkFreq >= now)) {
 +                    if (log.isDebugEnabled())
 +                        log.debug("Skipping segment check as it has not been requested and it is not time to check.");
 +
 +                    continue;
 +                }
 +
 +                // We should always check segment if it has been explicitly
 +                // requested (on any node failure or leave).
 +                assert req != null || lastChk + segChkFreq < now;
 +
 +                // Drain queue.
 +                while (queue.poll() != null) {
 +                    // No-op.
 +                }
 +
 +                if (lastSegChkRes.get()) {
 +                    boolean segValid = ctx.segmentation().isValidSegment();
 +
 +                    lastChk = now;
 +
 +                    if (!segValid) {
 +                        discoWrk.addEvent(EVT_NODE_SEGMENTED, AffinityTopologyVersion.NONE, getSpi().getLocalNode(),
 +                            Collections.<ClusterNode>emptyList(), null);
 +
 +                        lastSegChkRes.set(false);
 +                    }
 +
 +                    if (log.isDebugEnabled())
 +                        log.debug("Segment has been checked [requested=" + (req != null) + ", valid=" + segValid + ']');
 +                }
 +            }
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public String toString() {
 +            return S.toString(SegmentCheckWorker.class, this);
 +        }
 +    }
 +
 +    /** Worker for discovery events. */
 +    private class DiscoveryWorker extends GridWorker {
 +        /** Event queue. */
 +        private final BlockingQueue<GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>,
 +            DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>();
 +
 +        /** Node segmented event fired flag. */
 +        private boolean nodeSegFired;
 +
 +        /**
 +         *
 +         */
 +        private DiscoveryWorker() {
 +            super(ctx.gridName(), "disco-event-worker", GridDiscoveryManager.this.log);
 +        }
 +
 +        /**
 +         * Method is called when any discovery event occurs.
 +         *
 +         * @param type Discovery event type. See {@link DiscoveryEvent} for more details.
 +         * @param topVer Topology version.
 +         * @param node Remote node this event is connected with.
 +         * @param topSnapshot Topology snapshot.
 +         */
 +        @SuppressWarnings("RedundantTypeArguments")
 +        private void recordEvent(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot) {
              assert node != null;
  
              if (ctx.event().isRecordable(type)) {
@@@ -2279,90 -2279,90 +2432,6 @@@
          }
      }
  
--    /** Discovery topology future. */
--    private static class DiscoTopologyFuture extends GridFutureAdapter<Long> implements GridLocalEventListener {
--        /** */
--        private static final long serialVersionUID = 0L;
--
--        /** */
--        private GridKernalContext ctx;
--
--        /** Topology await version. */
--        private long awaitVer;
--
--        /** Empty constructor required by {@link Externalizable}. */
--        private DiscoTopologyFuture() {
--            // No-op.
--        }
--
--        /**
--         * @param ctx Context.
--         * @param awaitVer Await version.
--         */
--        private DiscoTopologyFuture(GridKernalContext ctx, long awaitVer) {
--            this.ctx = ctx;
--            this.awaitVer = awaitVer;
--        }
--
--        /** Initializes future. */
--        private void init() {
--            ctx.event().addLocalEventListener(this, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
--
--            // Close potential window.
--            long topVer = ctx.discovery().topologyVersion();
--
--            if (topVer >= awaitVer)
--                onDone(topVer);
--        }
--
--        /** {@inheritDoc} */
--        @Override public boolean onDone(@Nullable Long res, @Nullable Throwable err) {
--            if (super.onDone(res, err)) {
--                ctx.event().removeLocalEventListener(this, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
--
--                return true;
--            }
--
--            return false;
--        }
--
--        /** {@inheritDoc} */
--        @Override public void onEvent(Event evt) {
--            assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
--
--            DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
--
--            if (discoEvt.topologyVersion() >= awaitVer)
--                onDone(discoEvt.topologyVersion());
--        }
--    }
--
--    /**
--     *
--     */
--    private static class Snapshot {
--        /** */
--        private final AffinityTopologyVersion topVer;
--
--        /** */
--        @GridToStringExclude
--        private final DiscoCache discoCache;
--
--        /**
--         * @param topVer Topology version.
--         * @param discoCache Disco cache.
--         */
--        private Snapshot(AffinityTopologyVersion topVer, DiscoCache discoCache) {
--            this.topVer = topVer;
--            this.discoCache = discoCache;
--        }
--
--        /** {@inheritDoc} */
--        @Override public String toString() {
--            return S.toString(Snapshot.class, this);
--        }
--    }
--
      /** Cache for discovery collections. */
      private class DiscoCache {
          /** Remote nodes. */
@@@ -2841,106 -2841,583 +2910,4 @@@
              return S.toString(DiscoCache.class, this, "allNodesWithDaemons", U.toShortString(allNodes));
          }
      }
--
-     /**
 -    /** Cache for discovery collections. */
 -    private class DiscoCache {
 -        /** Remote nodes. */
 -        private final List<ClusterNode> rmtNodes;
 -
 -        /** All nodes. */
 -        private final List<ClusterNode> allNodes;
 -
 -        /** All nodes with at least one cache configured. */
 -        @GridToStringInclude
 -        private final Collection<ClusterNode> allNodesWithCaches;
 -
 -        /** All nodes with at least one cache configured. */
 -        @GridToStringInclude
 -        private final Collection<ClusterNode> rmtNodesWithCaches;
 -
 -        /** Cache nodes by cache name. */
 -        @GridToStringInclude
 -        private final Map<String, Collection<ClusterNode>> allCacheNodes;
 -
 -        /** Remote cache nodes by cache name. */
 -        @GridToStringInclude
 -        private final Map<String, Collection<ClusterNode>> rmtCacheNodes;
 -
 -        /** Cache nodes by cache name. */
 -        @GridToStringInclude
 -        private final Map<String, Collection<ClusterNode>> affCacheNodes;
 -
 -        /** Caches where at least one node has near cache enabled. */
 -        @GridToStringInclude
 -        private final Set<String> nearEnabledCaches;
 -
 -        /** Nodes grouped by version. */
 -        private final NavigableMap<IgniteProductVersion, Collection<ClusterNode>> nodesByVer;
 -
 -        /** Daemon nodes. */
 -        private final List<ClusterNode> daemonNodes;
 -
 -        /** Node map. */
 -        private final Map<UUID, ClusterNode> nodeMap;
 -
 -        /** Local node. */
 -        private final ClusterNode loc;
 -
 -        /** Highest node order. */
 -        private final long maxOrder;
 -
 -        /**
 -         * Cached alive nodes list. As long as this collection doesn't accept {@code null}s use {@link
 -         * #maskNull(String)} before passing raw cache names to it.
 -         */
 -        private final ConcurrentMap<String, Collection<ClusterNode>> aliveCacheNodes;
 -
 -        /**
 -         * Cached alive remote nodes list. As long as this collection doesn't accept {@code null}s use {@link
 -         * #maskNull(String)} before passing raw cache names to it.
 -         */
 -        private final ConcurrentMap<String, Collection<ClusterNode>> aliveRmtCacheNodes;
 -
 -        /**
 -         * Cached alive remote nodes with caches.
 -         */
 -        private final Collection<ClusterNode> aliveNodesWithCaches;
 -
 -        /**
 -         * Cached alive server remote nodes with caches.
 -         */
 -        private final Collection<ClusterNode> aliveSrvNodesWithCaches;
 -
 -        /**
 -         * Cached alive remote server nodes with caches.
 -         */
 -        private final Collection<ClusterNode> aliveRmtSrvNodesWithCaches;
 -
 -        /**
 -         * @param loc Local node.
 -         * @param rmts Remote nodes.
 -         */
 -        private DiscoCache(ClusterNode loc, Collection<ClusterNode> rmts) {
 -            this.loc = loc;
 -
 -            rmtNodes = Collections.unmodifiableList(new ArrayList<>(F.view(rmts, daemonFilter)));
 -
 -            assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" +
 -                " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']';
 -
 -            List<ClusterNode> all = new ArrayList<>(rmtNodes.size() + 1);
 -
 -            if (!loc.isDaemon())
 -                all.add(loc);
 -
 -            all.addAll(rmtNodes);
 -
 -            Collections.sort(all, GridNodeOrderComparator.INSTANCE);
 -
 -            allNodes = Collections.unmodifiableList(all);
 -
 -            Map<String, Collection<ClusterNode>> cacheMap = new HashMap<>(allNodes.size(), 1.0f);
 -            Map<String, Collection<ClusterNode>> rmtCacheMap = new HashMap<>(allNodes.size(), 1.0f);
 -            Map<String, Collection<ClusterNode>> dhtNodesMap = new HashMap<>(allNodes.size(), 1.0f);
 -            Collection<ClusterNode> nodesWithCaches = new HashSet<>(allNodes.size());
 -            Collection<ClusterNode> rmtNodesWithCaches = new HashSet<>(allNodes.size());
 -
 -            aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
 -            aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
 -            aliveNodesWithCaches = new ConcurrentSkipListSet<>();
 -            aliveSrvNodesWithCaches = new ConcurrentSkipListSet<>();
 -            aliveRmtSrvNodesWithCaches = new ConcurrentSkipListSet<>();
 -            nodesByVer = new TreeMap<>();
 -
 -            long maxOrder0 = 0;
 -
 -            Set<String> nearEnabledSet = new HashSet<>();
 -
 -            for (ClusterNode node : allNodes) {
 -                assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']';
 -
 -                if (node.order() > maxOrder0)
 -                    maxOrder0 = node.order();
 -
 -                boolean hasCaches = false;
 -
 -                for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) {
 -                    String cacheName = entry.getKey();
 -
 -                    CachePredicate filter = entry.getValue();
 -
 -                    if (filter.cacheNode(node)) {
 -                        nodesWithCaches.add(node);
 -
 -                        if (!loc.id().equals(node.id()))
 -                            rmtNodesWithCaches.add(node);
 -
 -                        addToMap(cacheMap, cacheName, node);
 -
 -                        if (alive(node.id()))
 -                            addToMap(aliveCacheNodes, maskNull(cacheName), node);
 -
 -                        if (filter.dataNode(node))
 -                            addToMap(dhtNodesMap, cacheName, node);
 -
 -                        if (filter.nearNode(node))
 -                            nearEnabledSet.add(cacheName);
 -
 -                        if (!loc.id().equals(node.id())) {
 -                            addToMap(rmtCacheMap, cacheName, node);
 -
 -                            if (alive(node.id()))
 -                                addToMap(aliveRmtCacheNodes, maskNull(cacheName), node);
 -                        }
 -
 -                        hasCaches = true;
 -                    }
 -                }
 -
 -                if (hasCaches) {
 -                    if (alive(node.id())) {
 -                        aliveNodesWithCaches.add(node);
 -
 -                        if (!CU.clientNode(node)) {
 -                            aliveSrvNodesWithCaches.add(node);
 -
 -                            if (!loc.id().equals(node.id()))
 -                                aliveRmtSrvNodesWithCaches.add(node);
 -                        }
 -                    }
 -                }
 -
 -                IgniteProductVersion nodeVer = U.productVersion(node);
 -
 -                // Create collection for this version if it does not exist.
 -                Collection<ClusterNode> nodes = nodesByVer.get(nodeVer);
 -
 -                if (nodes == null) {
 -                    nodes = new ArrayList<>(allNodes.size());
 -
 -                    nodesByVer.put(nodeVer, nodes);
 -                }
 -
 -                nodes.add(node);
 -            }
 -
 -            // Need second iteration to add this node to all previous node versions.
 -            for (ClusterNode node : allNodes) {
 -                IgniteProductVersion nodeVer = U.productVersion(node);
 -
 -                // Get all versions lower or equal node's version.
 -                NavigableMap<IgniteProductVersion, Collection<ClusterNode>> updateView =
 -                    nodesByVer.headMap(nodeVer, false);
 -
 -                for (Collection<ClusterNode> prevVersions : updateView.values())
 -                    prevVersions.add(node);
 -            }
 -
 -            maxOrder = maxOrder0;
 -
 -            allCacheNodes = Collections.unmodifiableMap(cacheMap);
 -            rmtCacheNodes = Collections.unmodifiableMap(rmtCacheMap);
 -            affCacheNodes = Collections.unmodifiableMap(dhtNodesMap);
 -            allNodesWithCaches = Collections.unmodifiableCollection(nodesWithCaches);
 -            this.rmtNodesWithCaches = Collections.unmodifiableCollection(rmtNodesWithCaches);
 -            nearEnabledCaches = Collections.unmodifiableSet(nearEnabledSet);
 -
 -            daemonNodes = Collections.unmodifiableList(new ArrayList<>(
 -                F.view(F.concat(false, loc, rmts), F0.not(daemonFilter))));
 -
 -            Map<UUID, ClusterNode> nodeMap = new HashMap<>(allNodes().size() + daemonNodes.size(), 1.0f);
 -
 -            for (ClusterNode n : F.concat(false, allNodes(), daemonNodes()))
 -                nodeMap.put(n.id(), n);
 -
 -            this.nodeMap = nodeMap;
 -        }
 -
 -        /**
 -         * Adds node to map.
 -         *
 -         * @param cacheMap Map to add to.
 -         * @param cacheName Cache name.
 -         * @param rich Node to add
 -         */
 -        private void addToMap(Map<String, Collection<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) {
 -            Collection<ClusterNode> cacheNodes = cacheMap.get(cacheName);
 -
 -            if (cacheNodes == null) {
 -                cacheNodes = new ArrayList<>(allNodes.size());
 -
 -                cacheMap.put(cacheName, cacheNodes);
 -            }
 -
 -            cacheNodes.add(rich);
 -        }
 -
 -        /** @return Local node. */
 -        ClusterNode localNode() {
 -            return loc;
 -        }
 -
 -        /** @return Remote nodes. */
 -        Collection<ClusterNode> remoteNodes() {
 -            return rmtNodes;
 -        }
 -
 -        /** @return All nodes. */
 -        Collection<ClusterNode> allNodes() {
 -            return allNodes;
 -        }
 -
 -        /**
 -         * Gets collection of nodes which have version equal or greater than {@code ver}.
 -         *
 -         * @param ver Version to check.
 -         * @return Collection of nodes with version equal or greater than {@code ver}.
 -         */
 -        Collection<ClusterNode> elderNodes(IgniteProductVersion ver) {
 -            Map.Entry<IgniteProductVersion, Collection<ClusterNode>> entry = nodesByVer.ceilingEntry(ver);
 -
 -            if (entry == null)
 -                return Collections.emptyList();
 -
 -            return entry.getValue();
 -        }
 -
 -        /**
 -         * @return Versions map.
 -         */
 -        NavigableMap<IgniteProductVersion, Collection<ClusterNode>> versionsMap() {
 -            return nodesByVer;
 -        }
 -
 -        /**
 -         * Gets collection of nodes with at least one cache configured.
 -         *
 -         * @param topVer Topology version (maximum allowed node order).
 -         * @return Collection of nodes.
 -         */
 -        Collection<ClusterNode> allNodesWithCaches(final long topVer) {
 -            return filter(topVer, allNodesWithCaches);
 -        }
 -
 -        /**
 -         * Gets all nodes that have cache with given name.
 -         *
 -         * @param cacheName Cache name.
 -         * @param topVer Topology version.
 -         * @return Collection of nodes.
 -         */
 -        Collection<ClusterNode> cacheNodes(@Nullable String cacheName, final long topVer) {
 -            return filter(topVer, allCacheNodes.get(cacheName));
 -        }
 -
 -        /**
 -         * Gets all remote nodes that have cache with given name.
 -         *
 -         * @param cacheName Cache name.
 -         * @param topVer Topology version.
 -         * @return Collection of nodes.
 -         */
 -        Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, final long topVer) {
 -            return filter(topVer, rmtCacheNodes.get(cacheName));
 -        }
 -
 -        /**
 -         * Gets all remote nodes that have at least one cache configured.
 -         *
 -         * @param topVer Topology version.
 -         * @return Collection of nodes.
 -         */
 -        Collection<ClusterNode> remoteCacheNodes(final long topVer) {
 -            return filter(topVer, rmtNodesWithCaches);
 -        }
 -
 -        /**
 -         * Gets all nodes that have cache with given name and should participate in affinity calculation. With
 -         * partitioned cache nodes with near-only cache do not participate in affinity node calculation.
 -         *
 -         * @param cacheName Cache name.
 -         * @param topVer Topology version.
 -         * @return Collection of nodes.
 -         */
 -        Collection<ClusterNode> cacheAffinityNodes(@Nullable String cacheName, final long topVer) {
 -            return filter(topVer, affCacheNodes.get(cacheName));
 -        }
 -
 -        /**
 -         * Gets all alive nodes that have cache with given name.
 -         *
 -         * @param cacheName Cache name.
 -         * @param topVer Topology version.
 -         * @return Collection of nodes.
 -         */
 -        Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, final long topVer) {
 -            return filter(topVer, aliveCacheNodes.get(maskNull(cacheName)));
 -        }
 -
 -        /**
 -         * Gets all alive remote nodes that have cache with given name.
 -         *
 -         * @param cacheName Cache name.
 -         * @param topVer Topology version.
 -         * @return Collection of nodes.
 -         */
 -        Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, final long topVer) {
 -            return filter(topVer, aliveRmtCacheNodes.get(maskNull(cacheName)));
 -        }
 -
 -        /**
 -         * Gets all alive remote server nodes with at least one cache configured.
 -         *
 -         * @param topVer Topology version.
 -         * @return Collection of nodes.
 -         */
 -        Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final long topVer) {
 -            return filter(topVer, aliveRmtSrvNodesWithCaches);
 -        }
 -
 -        /**
 -         * Gets all alive server nodes with at least one cache configured.
 -         *
 -         * @param topVer Topology version.
 -         * @return Collection of nodes.
 -         */
 -        Collection<ClusterNode> aliveServerNodesWithCaches(final long topVer) {
 -            return filter(topVer, aliveSrvNodesWithCaches);
 -        }
 -
 -        /**
 -         * Gets all alive remote nodes with at least one cache configured.
 -         *
 -         * @param topVer Topology version.
 -         * @return Collection of nodes.
 -         */
 -        Collection<ClusterNode> aliveNodesWithCaches(final long topVer) {
 -            return filter(topVer, aliveNodesWithCaches);
 -        }
 -
 -        /**
 -         * Checks if cache with given name has at least one node with near cache enabled.
 -         *
 -         * @param cacheName Cache name.
 -         * @return {@code True} if cache with given name has at least one node with near cache enabled.
 -         */
 -        boolean hasNearCache(@Nullable String cacheName) {
 -            return nearEnabledCaches.contains(cacheName);
 -        }
 -
 -        /**
 -         * Removes left node from cached alives lists.
 -         *
 -         * @param leftNode Left node.
 -         */
 -        void updateAlives(ClusterNode leftNode) {
 -            if (leftNode.order() > maxOrder)
 -                return;
 -
 -            filterNodeMap(aliveCacheNodes, leftNode);
 -
 -            filterNodeMap(aliveRmtCacheNodes, leftNode);
 -
 -            aliveNodesWithCaches.remove(leftNode);
 -            aliveSrvNodesWithCaches.remove(leftNode);
 -            aliveRmtSrvNodesWithCaches.remove(leftNode);
 -        }
 -
 -        /**
 -         * Creates a copy of nodes map without the given node.
 -         *
 -         * @param map Map to copy.
 -         * @param exclNode Node to exclude.
 -         */
 -        private void filterNodeMap(ConcurrentMap<String, Collection<ClusterNode>> map, final ClusterNode exclNode) {
 -            for (String cacheName : registeredCaches.keySet()) {
 -                String maskedName = maskNull(cacheName);
 -
 -                while (true) {
 -                    Collection<ClusterNode> oldNodes = map.get(maskedName);
 -
 -                    if (oldNodes == null || oldNodes.isEmpty())
 -                        break;
 -
 -                    Collection<ClusterNode> newNodes = new ArrayList<>(oldNodes);
 -
 -                    if (!newNodes.remove(exclNode))
 -                        break;
 -
 -                    if (map.replace(maskedName, oldNodes, newNodes))
 -                        break;
 -                }
 -            }
 -        }
 -
 -        /**
 -         * Replaces {@code null} with {@code NULL_CACHE_NAME}.
 -         *
 -         * @param cacheName Cache name.
 -         * @return Masked name.
 -         */
 -        private String maskNull(@Nullable String cacheName) {
 -            return cacheName == null ? NULL_CACHE_NAME : cacheName;
 -        }
 -
 -        /**
 -         * @param topVer Topology version.
 -         * @param nodes Nodes.
 -         * @return Filtered collection (potentially empty, but never {@code null}).
 -         */
 -        private Collection<ClusterNode> filter(final long topVer, @Nullable Collection<ClusterNode> nodes) {
 -            if (nodes == null)
 -                return Collections.emptyList();
 -
 -            // If no filtering needed, return original collection.
 -            return nodes.isEmpty() || topVer < 0 || topVer >= maxOrder ?
 -                nodes :
 -                F.view(nodes, new P1<ClusterNode>() {
 -                    @Override public boolean apply(ClusterNode node) {
 -                        return node.order() <= topVer;
 -                    }
 -                });
 -        }
 -
 -        /** @return Daemon nodes. */
 -        Collection<ClusterNode> daemonNodes() {
 -            return daemonNodes;
 -        }
 -
 -        /**
 -         * @param id Node ID.
 -         * @return Node.
 -         */
 -        @Nullable ClusterNode node(UUID id) {
 -            return nodeMap.get(id);
 -        }
 -
 -        /** {@inheritDoc} */
 -        @Override public String toString() {
 -            return S.toString(DiscoCache.class, this, "allNodesWithDaemons", U.toShortString(allNodes));
 -        }
 -    }    /**
--     * Cache predicate.
--     */
--    private static class CachePredicate {
--        /** Cache filter. */
--        private final IgnitePredicate<ClusterNode> cacheFilter;
--
--        /** If near cache is enabled on data nodes. */
--        private final boolean nearEnabled;
--
--        /** Cache mode. */
--        private final CacheMode cacheMode;
--
--        /** Collection of client near nodes. */
--        private final ConcurrentHashMap<UUID, Boolean> clientNodes;
--
--        /**
--         * @param cacheFilter Cache filter.
--         * @param nearEnabled Near enabled flag.
--         * @param cacheMode Cache mode.
--         */
--        private CachePredicate(IgnitePredicate<ClusterNode> cacheFilter, boolean nearEnabled, CacheMode cacheMode) {
--            assert cacheFilter != null;
--
--            this.cacheFilter = cacheFilter;
--            this.nearEnabled = nearEnabled;
--            this.cacheMode = cacheMode;
--
--            clientNodes = new ConcurrentHashMap<>();
--        }
--
--        /**
--         * @param nodeId Near node ID to add.
--         * @param nearEnabled Near enabled flag.
--         * @return {@code True} if new node ID was added.
--         */
--        public boolean addClientNode(UUID nodeId, boolean nearEnabled) {
--            assert nodeId != null;
--
--            Boolean old = clientNodes.putIfAbsent(nodeId, nearEnabled);
--
--            return old == null;
--        }
--
--        /**
--         * @param leftNodeId Left node ID.
--         * @return {@code True} if existing node ID was removed.
--         */
--        public boolean onNodeLeft(UUID leftNodeId) {
--            assert leftNodeId != null;
--
--            Boolean old = clientNodes.remove(leftNodeId);
--
--            return old != null;
--        }
--
--        /**
--         * @param node Node to check.
--         * @return {@code True} if this node is a data node for given cache.
--         */
--        public boolean dataNode(ClusterNode node) {
--            return !node.isDaemon() && CU.affinityNode(node, cacheFilter);
--        }
--
--        /**
--         * @param node Node to check.
--         * @return {@code True} if cache is accessible on the given node.
--         */
--        public boolean cacheNode(ClusterNode node) {
--            return !node.isDaemon() && (CU.affinityNode(node, cacheFilter) || clientNodes.containsKey(node.id()));
--        }
--
--        /**
--         * @param node Node to check.
--         * @return {@code True} if near cache is present on the given nodes.
--         */
--        public boolean nearNode(ClusterNode node) {
--            if (node.isDaemon())
--                return false;
--
--            if (CU.affinityNode(node, cacheFilter))
--                return nearEnabled;
--
--            Boolean near = clientNodes.get(node.id());
--
--            return near != null && near;
--        }
--
--        /**
--         * @param node Node to check.
--         * @return {@code True} if client cache is present on the given nodes.
--         */
--        public boolean clientNode(ClusterNode node) {
--            if (node.isDaemon())
--                return false;
--
--            Boolean near = clientNodes.get(node.id());
--
--            return near != null && !near;
--        }
--    }
  }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae09fa9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 8c96c0c,736e630..a9919f8
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@@ -149,52 -149,52 +149,36 @@@ import static org.apache.ignite.transac
  public class GridCacheProcessor extends GridProcessorAdapter {
      /** Null cache name. */
      private static final String NULL_NAME = U.id8(UUID.randomUUID());
--
--    /** Shared cache context. */
--    private GridCacheSharedContext<?, ?> sharedCtx;
--
      /** */
      private final Map<String, GridCacheAdapter<?, ?>> caches;
--
      /** Caches stopped from onKernalStop callback. */
      private final Map<String, GridCacheAdapter> stoppedCaches = new ConcurrentHashMap<>();
--
      /** Map of proxies. */
      private final Map<String, IgniteCacheProxy<?, ?>> jCacheProxies;
--
      /** Map of preload finish futures grouped by preload order. */
      private final NavigableMap<Integer, IgniteInternalFuture<?>> preloadFuts;
--
--    /** Maximum detected rebalance order. */
--    private int maxRebalanceOrder;
--
      /** Caches stop sequence. */
      private final Deque<String> stopSeq;
--
++    /** Count down latch for caches. */
++    private final CountDownLatch cacheStartedLatch = new CountDownLatch(1);
++    /** Shared cache context. */
++    private GridCacheSharedContext<?, ?> sharedCtx;
++    /** Maximum detected rebalance order. */
++    private int maxRebalanceOrder;
      /** Transaction interface implementation. */
      private IgniteTransactionsImpl transactions;
--
      /** Pending cache starts. */
      private ConcurrentMap<String, IgniteInternalFuture> pendingFuts = new ConcurrentHashMap<>();
--
      /** Template configuration add futures. */
      private ConcurrentMap<String, IgniteInternalFuture> pendingTemplateFuts = new ConcurrentHashMap<>();
--
      /** Dynamic caches. */
      private ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>();
--
      /** Cache templates. */
      private ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates = new ConcurrentHashMap<>();
--
      /** */
      private IdentityHashMap<CacheStore, ThreadLocal> sesHolders = new IdentityHashMap<>();
--
      /** Must use JDK marshaller since it is used by discovery to fire custom events. */
      private Marshaller marshaller = new JdkMarshaller();
--
--    /** Count down latch for caches. */
--    private final CountDownLatch cacheStartedLatch = new CountDownLatch(1);
--
      /** */
      private Map<String, DynamicCacheDescriptor> cachesOnDisconnect;
  
@@@ -215,6 -215,6 +199,24 @@@
      }
  
      /**
++     * @param name Name to mask.
++     * @return Masked name.
++     */
++    private static String maskNull(String name) {
++        return name == null ? NULL_NAME : name;
++    }
++
++    /**
++     * @param name Name to unmask.
++     * @return Unmasked name.
++     */
++    @SuppressWarnings("StringEquality")
++    private static String unmaskNull(String name) {
++        // Intentional identity equality.
++        return name == NULL_NAME ? null : name;
++    }
++
++    /**
       * @param internalCache Internal cache flag.
       * @param cfg Initializes cache configuration with proper defaults.
       * @param cacheObjCtx Cache object context.
@@@ -2261,7 -2261,7 +2263,6 @@@
          return F.first(initiateCacheChanges(F.asList(t), false));
      }
  
--
      /**
       * @param cacheName Cache name to close.
       * @return Future that will be completed when cache is closed.
@@@ -3365,21 -3365,21 +3366,54 @@@
      }
  
      /**
--     * @param name Name to mask.
--     * @return Masked name.
++     *
       */
--    private static String maskNull(String name) {
--        return name == null ? NULL_NAME : name;
--    }
++    private static class LocalAffinityFunction implements AffinityFunction {
++        /** */
++        private static final long serialVersionUID = 0L;
  
--    /**
--     * @param name Name to unmask.
--     * @return Unmasked name.
--     */
--    @SuppressWarnings("StringEquality")
--    private static String unmaskNull(String name) {
--        // Intentional identity equality.
--        return name == NULL_NAME ? null : name;
++        /** {@inheritDoc} */
++        @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
++            ClusterNode locNode = null;
++
++            for (ClusterNode n : affCtx.currentTopologySnapshot()) {
++                if (n.isLocal()) {
++                    locNode = n;
++
++                    break;
++                }
++            }
++
++            if (locNode == null)
++                throw new IgniteException("Local node is not included into affinity nodes for 'LOCAL' cache");
++
++            List<List<ClusterNode>> res = new ArrayList<>(partitions());
++
++            for (int part = 0; part < partitions(); part++)
++                res.add(Collections.singletonList(locNode));
++
++            return Collections.unmodifiableList(res);
++        }
++
++        /** {@inheritDoc} */
++        @Override public void reset() {
++            // No-op.
++        }
++
++        /** {@inheritDoc} */
++        @Override public int partitions() {
++            return 1;
++        }
++
++        /** {@inheritDoc} */
++        @Override public int partition(Object key) {
++            return 0;
++        }
++
++        /** {@inheritDoc} */
++        @Override public void removeNode(UUID nodeId) {
++            // No-op.
++        }
      }
  
      /**
@@@ -3478,55 -3478,95 +3512,4 @@@
              return S.toString(TemplateConfigurationFuture.class, this);
          }
      }
--
--    /**
 -     *
 -     */
 -    @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
 -    private class TemplateConfigurationFuture extends GridFutureAdapter<Object> {
 -        /** Start ID. */
 -        @GridToStringInclude
 -        private IgniteUuid deploymentId;
 -
 -        /** Cache name. */
 -        private String cacheName;
 -
 -        /**
 -         * @param cacheName Cache name.
 -         * @param deploymentId Deployment ID.
 -         */
 -        private TemplateConfigurationFuture(String cacheName, IgniteUuid deploymentId) {
 -            this.deploymentId = deploymentId;
 -            this.cacheName = cacheName;
 -        }
 -
 -        /**
 -         * @return Start ID.
 -         */
 -        public IgniteUuid deploymentId() {
 -            return deploymentId;
 -        }
 -
 -        /** {@inheritDoc} */
 -        @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
 -            // Make sure to remove future before completion.
 -            pendingTemplateFuts.remove(maskNull(cacheName), this);
 -
 -            return super.onDone(res, err);
 -        }
 -
 -        /** {@inheritDoc} */
 -        @Override public String toString() {
 -            return S.toString(TemplateConfigurationFuture.class, this);
 -        }
 -    }    /**
--     *
--     */
--    private static class LocalAffinityFunction implements AffinityFunction {
--        /** */
--        private static final long serialVersionUID = 0L;
--
--        /** {@inheritDoc} */
--        @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
--            ClusterNode locNode = null;
--
--            for (ClusterNode n : affCtx.currentTopologySnapshot()) {
--                if (n.isLocal()) {
--                    locNode = n;
--
--                    break;
--                }
--            }
--
--            if (locNode == null)
--                throw new IgniteException("Local node is not included into affinity nodes for 'LOCAL' cache");
--
--            List<List<ClusterNode>> res = new ArrayList<>(partitions());
--
--            for (int part = 0; part < partitions(); part++)
--                res.add(Collections.singletonList(locNode));
--
--            return Collections.unmodifiableList(res);
--        }
--
--        /** {@inheritDoc} */
--        @Override public void reset() {
--            // No-op.
--        }
--
--        /** {@inheritDoc} */
--        @Override public int partitions() {
--            return 1;
--        }
--
--        /** {@inheritDoc} */
--        @Override public int partition(Object key) {
--            return 0;
--        }
--
--        /** {@inheritDoc} */
--        @Override public void removeNode(UUID nodeId) {
--            // No-op.
--        }
--    }
  }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae09fa9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index f907d5b,698b035..c94cc46
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@@ -136,33 -136,33 +136,24 @@@ import static org.apache.ignite.interna
  public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapter<K, V> {
      /** */
      public static int MAX_ITERATORS = 1000;
--
++    /** */
++    private final ConcurrentMap<UUID, Map<Long, GridFutureAdapter<QueryResult<K, V>>>> qryIters =
++        new ConcurrentHashMap8<>();
++    /** */
++    private final ConcurrentMap<UUID, Map<Long, GridFutureAdapter<FieldsResult>>> fieldsQryRes =
++        new ConcurrentHashMap8<>();
++    /** */
++    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
      /** */
      protected GridQueryProcessor qryProc;
--
      /** */
      private String space;
--
      /** */
      private int maxIterCnt;
--
      /** */
      private volatile GridCacheQueryMetricsAdapter metrics = new GridCacheQueryMetricsAdapter();
--
--    /** */
--    private final ConcurrentMap<UUID, Map<Long, GridFutureAdapter<QueryResult<K, V>>>> qryIters =
--        new ConcurrentHashMap8<>();
--
--    /** */
--    private final ConcurrentMap<UUID, Map<Long, GridFutureAdapter<FieldsResult>>> fieldsQryRes =
--        new ConcurrentHashMap8<>();
--
      /** */
      private volatile ConcurrentMap<Object, CachedResult<?>> qryResCache = new ConcurrentHashMap8<>();
--
--    /** */
--    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
--
      /** Event listener. */
      private GridLocalEventListener lsnr;
  
@@@ -172,6 -172,6 +163,17 @@@
      /** */
      private AffinityTopologyVersion qryTopVer;
  
++    /**
++     * @param sndId Sender node ID.
++     * @param reqId Request ID.
++     * @return Recipient ID.
++     */
++    private static Object recipient(UUID sndId, long reqId) {
++        assert sndId != null;
++
++        return new IgniteBiTuple<>(sndId, reqId);
++    }
++
      /** {@inheritDoc} */
      @Override public void start0() throws IgniteCheckedException {
          qryProc = cctx.kernalContext().query();
@@@ -1715,17 -1715,17 +1717,6 @@@
      }
  
      /**
--     * @param sndId Sender node ID.
--     * @param reqId Request ID.
--     * @return Recipient ID.
--     */
--    private static Object recipient(UUID sndId, long reqId) {
--        assert sndId != null;
--
--        return new IgniteBiTuple<>(sndId, reqId);
--    }
--
--    /**
       * @param qryInfo Info.
       * @return Iterator.
       * @throws IgniteCheckedException In case of error.
@@@ -2031,6 -2031,6 +2022,89 @@@
      }
  
      /**
++     * Query for {@link IndexingSpi}.
++     *
++     * @param keepPortable Keep portable flag.
++     * @return Query.
++     */
++    public <R> CacheQuery<R> createSpiQuery(boolean keepPortable) {
++        return new GridCacheQueryAdapter<>(cctx,
++            SPI,
++            null,
++            null,
++            null,
++            null,
++            false,
++            keepPortable);
++    }
++
++    /**
++     * Creates user's predicate based scan query.
++     *
++     * @param filter Scan filter.
++     * @param part Partition.
++     * @param keepPortable Keep portable flag.
++     * @return Created query.
++     */
++    public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter,
++        @Nullable Integer part, boolean keepPortable) {
++
++        return new GridCacheQueryAdapter<>(cctx,
++            SCAN,
++            null,
++            null,
++            (IgniteBiPredicate<Object, Object>)filter,
++            part,
++            false,
++            keepPortable);
++    }
++
++    /**
++     * Creates user's full text query, queried class, and query clause. For more information refer to {@link CacheQuery}
++     * documentation.
++     *
++     * @param clsName Query class name.
++     * @param search Search clause.
++     * @param keepPortable Keep portable flag.
++     * @return Created query.
++     */
++    public CacheQuery<Map.Entry<K, V>> createFullTextQuery(String clsName,
++        String search, boolean keepPortable) {
++        A.notNull("clsName", clsName);
++        A.notNull("search", search);
++
++        return new GridCacheQueryAdapter<>(cctx,
++            TEXT,
++            clsName,
++            search,
++            null,
++            null,
++            false,
++            keepPortable);
++    }
++
++    /**
++     * Creates user's SQL fields query for given clause. For more information refer to {@link CacheQuery}
++     * documentation.
++     *
++     * @param qry Query.
++     * @param keepPortable Keep portable flag.
++     * @return Created query.
++     */
++    public CacheQuery<List<?>> createSqlFieldsQuery(String qry, boolean keepPortable) {
++        A.notNull(qry, "qry");
++
++        return new GridCacheQueryAdapter<>(cctx,
++            SQL_FIELDS,
++            null,
++            qry,
++            null,
++            null,
++            false,
++            keepPortable);
++    }
++
++    /**
       * Metadata job.
       */
      @GridInternal
@@@ -2435,315 -2435,313 +2509,98 @@@
      /**
       *
       */
--    private abstract class AbstractLazySwapEntry {
++    private static class CompoundIterator<T> extends GridIteratorAdapter<T> {
          /** */
--        private K key;
++        private static final long serialVersionUID = 4585888051556166304L;
  
          /** */
--        private V val;
++        private final List<GridIterator<T>> iters;
  
--        /**
--         * @return Key bytes.
--         */
--        protected abstract byte[] keyBytes();
++        /** */
++        private int idx;
  
--        /**
--         * @return Value.
--         * @throws IgniteCheckedException If failed.
--         */
--        protected abstract V unmarshalValue() throws IgniteCheckedException;
++        /** */
++        private GridIterator<T> iter;
  
          /**
--         * @return Key.
++         * @param iters Iterators.
           */
--        K key() {
--            try {
--                if (key != null)
--                    return key;
++        private CompoundIterator(List<GridIterator<T>> iters) {
++            if (iters.isEmpty())
++                throw new IllegalArgumentException();
  
--                key = cctx.toCacheKeyObject(keyBytes()).value(cctx.cacheObjectContext(), false);
++            this.iters = iters;
  
--                return key;
--            }
--            catch (IgniteCheckedException e) {
--                throw new IgniteException(e);
--            }
++            iter = F.first(iters);
          }
  
--        /**
--         * @return Value.
--         */
--        V value() {
--            try {
--                if (val != null)
--                    return val;
++        /** {@inheritDoc} */
++        @Override public boolean hasNextX() throws IgniteCheckedException {
++            if (iter.hasNextX())
++                return true;
  
--                val = unmarshalValue();
++            idx++;
  
--                return val;
--            }
--            catch (IgniteCheckedException e) {
--                throw new IgniteException(e);
++            while (idx < iters.size()) {
++                iter = iters.get(idx);
++
++                if (iter.hasNextX())
++                    return true;
++
++                idx++;
              }
++
++            return false;
          }
  
--        /**
--         * @return TTL.
--         */
--        abstract long timeToLive();
++        /** {@inheritDoc} */
++        @Override public T nextX() throws IgniteCheckedException {
++            if (!hasNextX())
++                throw new NoSuchElementException();
  
--        /**
--         * @return Expire time.
--         */
--        abstract long expireTime();
++            return iter.nextX();
++        }
  
--        /**
--         * @return Version.
--         */
--        abstract GridCacheVersion version();
++        /** {@inheritDoc} */
++        @Override public void removeX() throws IgniteCheckedException {
++            throw new UnsupportedOperationException();
++        }
      }
  
      /**
--     *
++     * Cached result.
       */
--    private class LazySwapEntry extends AbstractLazySwapEntry {
++    private abstract static class CachedResult<R> extends GridFutureAdapter<IgniteSpiCloseableIterator<R>> {
++        /** Absolute position of each recipient. */
++        private final Map<Object, QueueIterator> recipients = new GridLeanMap<>(1);
          /** */
--        private final Map.Entry<byte[], byte[]> e;
++        private CircularQueue<R> queue;
++        /** */
++        private int pruned;
  
          /**
--         * @param e Entry with
++         * @param rcpt ID of the recipient.
           */
--        LazySwapEntry(Map.Entry<byte[], byte[]> e) {
--            this.e = e;
--        }
++        protected CachedResult(Object rcpt) {
++            boolean res = addRecipient(rcpt);
  
--        /** {@inheritDoc} */
--        @Override protected byte[] keyBytes() {
--            return e.getKey();
++            assert res;
          }
  
--        /** {@inheritDoc} */
--        @SuppressWarnings("IfMayBeConditional")
--        @Override protected V unmarshalValue() throws IgniteCheckedException {
--            IgniteBiTuple<byte[], Byte> t = GridCacheSwapEntryImpl.getValue(e.getValue());
++        /**
++         * Close if this result does not have any other recipients.
++         *
++         * @param rcpt ID of the recipient.
++         * @throws IgniteCheckedException If failed.
++         */
++        public void closeIfNotShared(Object rcpt) throws IgniteCheckedException {
++            assert isDone();
  
--            CacheObject obj = cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1());
++            synchronized (recipients) {
++                if (recipients.isEmpty())
++                    return;
  
--            return obj.value(cctx.cacheObjectContext(), false);
--        }
--
--        /** {@inheritDoc} */
--        @Override long timeToLive() {
--            return GridCacheSwapEntryImpl.timeToLive(e.getValue());
--        }
--
--        /** {@inheritDoc} */
--        @Override long expireTime() {
--            return GridCacheSwapEntryImpl.expireTime(e.getValue());
--        }
--
--        /** {@inheritDoc} */
--        @Override GridCacheVersion version() {
--            return GridCacheSwapEntryImpl.version(e.getValue());
--        }
--    }
--
--    /**
--     *
--     */
--    private class LazyOffheapEntry extends AbstractLazySwapEntry {
--        /** */
--        private final T2<Long, Integer> keyPtr;
--
--        /** */
--        private final T2<Long, Integer> valPtr;
--
--        /**
--         * @param keyPtr Key address.
--         * @param valPtr Value address.
--         */
--        private LazyOffheapEntry(T2<Long, Integer> keyPtr, T2<Long, Integer> valPtr) {
--            assert keyPtr != null;
--            assert valPtr != null;
--
--            this.keyPtr = keyPtr;
--            this.valPtr = valPtr;
--        }
--
--        /** {@inheritDoc} */
--        @Override protected byte[] keyBytes() {
--            return U.copyMemory(keyPtr.get1(), keyPtr.get2());
--        }
--
--        /** {@inheritDoc} */
--        @Override protected V unmarshalValue() throws IgniteCheckedException {
--            long ptr = GridCacheOffheapSwapEntry.valueAddress(valPtr.get1(), valPtr.get2());
--
--            CacheObject obj = cctx.fromOffheap(ptr, false);
--
--            V val = CU.value(obj, cctx, false);
--
--            assert val != null;
--
--            return val;
--        }
--
--        /** {@inheritDoc} */
--        @Override long timeToLive() {
--            return GridCacheOffheapSwapEntry.timeToLive(valPtr.get1());
--        }
--
--        /** {@inheritDoc} */
--        @Override long expireTime() {
--            return GridCacheOffheapSwapEntry.expireTime(valPtr.get1());
--        }
--
--        /** {@inheritDoc} */
--        @Override GridCacheVersion version() {
--            return GridCacheOffheapSwapEntry.version(valPtr.get1());
--        }
--    }
--
--    /**
--     *
--     */
--    private class OffheapIteratorClosure
--        extends CX2<T2<Long, Integer>, T2<Long, Integer>, IgniteBiTuple<K, V>> {
--        /** */
--        private static final long serialVersionUID = 7410163202728985912L;
--
--        /** */
--        private IgniteBiPredicate<K, V> filter;
--
--        /** */
--        private boolean keepPortable;
--
--        /**
--         * @param filter Filter.
--         * @param keepPortable Keep portable flag.
--         */
--        private OffheapIteratorClosure(
--            @Nullable IgniteBiPredicate<K, V> filter,
--            boolean keepPortable) {
--            assert filter != null;
--
--            this.filter = filter;
--            this.keepPortable = keepPortable;
--        }
--
--        /** {@inheritDoc} */
--        @Nullable @Override public IgniteBiTuple<K, V> applyx(T2<Long, Integer> keyPtr,
--            T2<Long, Integer> valPtr)
--            throws IgniteCheckedException {
--            LazyOffheapEntry e = new LazyOffheapEntry(keyPtr, valPtr);
--
--            K key = (K)cctx.unwrapPortableIfNeeded(e.key(), keepPortable);
--            V val = (V)cctx.unwrapPortableIfNeeded(e.value(), keepPortable);
--
--            if (!filter.apply(key, val))
--                return null;
--
--            return new IgniteBiTuple<>(e.key(), (V)cctx.unwrapTemporary(e.value()));
--        }
--    }
--
--    /**
--     *
--     */
--    private static class CompoundIterator<T> extends GridIteratorAdapter<T> {
--        /** */
--        private static final long serialVersionUID = 4585888051556166304L;
--
--        /** */
--        private final List<GridIterator<T>> iters;
--
--        /** */
--        private int idx;
--
--        /** */
--        private GridIterator<T> iter;
--
--        /**
--         * @param iters Iterators.
--         */
--        private CompoundIterator(List<GridIterator<T>> iters) {
--            if (iters.isEmpty())
--                throw new IllegalArgumentException();
--
--            this.iters = iters;
--
--            iter = F.first(iters);
--        }
--
--        /** {@inheritDoc} */
--        @Override public boolean hasNextX() throws IgniteCheckedException {
--            if (iter.hasNextX())
--                return true;
--
--            idx++;
--
--            while (idx < iters.size()) {
--                iter = iters.get(idx);
--
--                if (iter.hasNextX())
--                    return true;
--
--                idx++;
--            }
--
--            return false;
--        }
--
--        /** {@inheritDoc} */
--        @Override public T nextX() throws IgniteCheckedException {
--            if (!hasNextX())
--                throw new NoSuchElementException();
--
--            return iter.nextX();
--        }
--
--        /** {@inheritDoc} */
--        @Override public void removeX() throws IgniteCheckedException {
--            throw new UnsupportedOperationException();
--        }
--    }
--
--    /**
--     * Cached result.
--     */
--    private abstract static class CachedResult<R> extends GridFutureAdapter<IgniteSpiCloseableIterator<R>> {
 -        /** Absolute position of each recipient. */
 -        private final Map<Object, QueueIterator> recipients = new GridLeanMap<>(1);
--        /** */
--        private CircularQueue<R> queue;
- 
--        /** */
--        private int pruned;
- 
-         /** Absolute position of each recipient. */
-         private final Map<Object, QueueIterator> recipients = new GridLeanMap<>(1);
--
--        /**
--         * @param rcpt ID of the recipient.
--         */
--        protected CachedResult(Object rcpt) {
--            boolean res = addRecipient(rcpt);
--
--            assert res;
--        }
--
--        /**
--         * Close if this result does not have any other recipients.
--         *
--         * @param rcpt ID of the recipient.
--         * @throws IgniteCheckedException If failed.
--         */
--        public void closeIfNotShared(Object rcpt) throws IgniteCheckedException {
--            assert isDone();
--
--            synchronized (recipients) {
--                if (recipients.isEmpty())
--                    return;
--
--                recipients.remove(rcpt);
++                recipients.remove(rcpt);
  
                  if (recipients.isEmpty())
                      get().close();
@@@ -3022,85 -3020,126 +2879,217 @@@
      }
  
      /**
--     * Query for {@link IndexingSpi}.
       *
--     * @param keepPortable Keep portable flag.
--     * @return Query.
       */
--    public <R> CacheQuery<R> createSpiQuery(boolean keepPortable) {
--        return new GridCacheQueryAdapter<>(cctx,
--            SPI,
--            null,
--            null,
--            null,
--            null,
--            false,
--            keepPortable);
++    private abstract class AbstractLazySwapEntry {
++        /** */
++        private K key;
++
++        /** */
++        private V val;
++
++        /**
++         * @return Key bytes.
++         */
++        protected abstract byte[] keyBytes();
++
++        /**
++         * @return Value.
++         * @throws IgniteCheckedException If failed.
++         */
++        protected abstract V unmarshalValue() throws IgniteCheckedException;
++
++        /**
++         * @return Key.
++         */
++        K key() {
++            try {
++                if (key != null)
++                    return key;
++
++                key = cctx.toCacheKeyObject(keyBytes()).value(cctx.cacheObjectContext(), false);
++
++                return key;
++            }
++            catch (IgniteCheckedException e) {
++                throw new IgniteException(e);
++            }
++        }
++
++        /**
++         * @return Value.
++         */
++        V value() {
++            try {
++                if (val != null)
++                    return val;
++
++                val = unmarshalValue();
++
++                return val;
++            }
++            catch (IgniteCheckedException e) {
++                throw new IgniteException(e);
++            }
++        }
++
++        /**
++         * @return TTL.
++         */
++        abstract long timeToLive();
++
++        /**
++         * @return Expire time.
++         */
++        abstract long expireTime();
++
++        /**
++         * @return Version.
++         */
++        abstract GridCacheVersion version();
      }
  
      /**
--     * Creates user's predicate based scan query.
       *
--     * @param filter Scan filter.
--     * @param part Partition.
--     * @param keepPortable Keep portable flag.
--     * @return Created query.
       */
--    public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter,
--        @Nullable Integer part, boolean keepPortable) {
++    private class LazySwapEntry extends AbstractLazySwapEntry {
++        /** */
++        private final Map.Entry<byte[], byte[]> e;
  
--        return new GridCacheQueryAdapter<>(cctx,
--            SCAN,
--            null,
--            null,
--            (IgniteBiPredicate<Object, Object>)filter,
--            part,
--            false,
--            keepPortable);
++        /**
++         * @param e Entry with
++         */
++        LazySwapEntry(Map.Entry<byte[], byte[]> e) {
++            this.e = e;
++        }
++
++        /** {@inheritDoc} */
++        @Override protected byte[] keyBytes() {
++            return e.getKey();
++        }
++
++        /** {@inheritDoc} */
++        @SuppressWarnings("IfMayBeConditional")
++        @Override protected V unmarshalValue() throws IgniteCheckedException {
++            IgniteBiTuple<byte[], Byte> t = GridCacheSwapEntryImpl.getValue(e.getValue());
++
++            CacheObject obj = cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1());
++
++            return obj.value(cctx.cacheObjectContext(), false);
++        }
++
++        /** {@inheritDoc} */
++        @Override long timeToLive() {
++            return GridCacheSwapEntryImpl.timeToLive(e.getValue());
++        }
++
++        /** {@inheritDoc} */
++        @Override long expireTime() {
++            return GridCacheSwapEntryImpl.expireTime(e.getValue());
++        }
++
++        /** {@inheritDoc} */
++        @Override GridCacheVersion version() {
++            return GridCacheSwapEntryImpl.version(e.getValue());
++        }
      }
  
      /**
--     * Creates user's full text query, queried class, and query clause. For more information refer to {@link CacheQuery}
--     * documentation.
       *
--     * @param clsName Query class name.
--     * @param search Search clause.
--     * @param keepPortable Keep portable flag.
--     * @return Created query.
       */
--    public CacheQuery<Map.Entry<K, V>> createFullTextQuery(String clsName,
--        String search, boolean keepPortable) {
--        A.notNull("clsName", clsName);
--        A.notNull("search", search);
++    private class LazyOffheapEntry extends AbstractLazySwapEntry {
++        /** */
++        private final T2<Long, Integer> keyPtr;
  
--        return new GridCacheQueryAdapter<>(cctx,
--            TEXT,
--            clsName,
--            search,
--            null,
--            null,
--            false,
--            keepPortable);
++        /** */
++        private final T2<Long, Integer> valPtr;
++
++        /**
++         * @param keyPtr Key address.
++         * @param valPtr Value address.
++         */
++        private LazyOffheapEntry(T2<Long, Integer> keyPtr, T2<Long, Integer> valPtr) {
++            assert keyPtr != null;
++            assert valPtr != null;
++
++            this.keyPtr = keyPtr;
++            this.valPtr = valPtr;
++        }
++
++        /** {@inheritDoc} */
++        @Override protected byte[] keyBytes() {
++            return U.copyMemory(keyPtr.get1(), keyPtr.get2());
++        }
++
++        /** {@inheritDoc} */
++        @Override protected V unmarshalValue() throws IgniteCheckedException {
++            long ptr = GridCacheOffheapSwapEntry.valueAddress(valPtr.get1(), valPtr.get2());
++
++            CacheObject obj = cctx.fromOffheap(ptr, false);
++
++            V val = CU.value(obj, cctx, false);
++
++            assert val != null;
++
++            return val;
++        }
++
++        /** {@inheritDoc} */
++        @Override long timeToLive() {
++            return GridCacheOffheapSwapEntry.timeToLive(valPtr.get1());
++        }
++
++        /** {@inheritDoc} */
++        @Override long expireTime() {
++            return GridCacheOffheapSwapEntry.expireTime(valPtr.get1());
++        }
++
++        /** {@inheritDoc} */
++        @Override GridCacheVersion version() {
++            return GridCacheOffheapSwapEntry.version(valPtr.get1());
++        }
      }
  
      /**
-      * Creates user's SQL fields query for given clause. For more information refer to {@link CacheQuery}
-      * documentation.
       *
-      * @param qry Query.
-      * @param keepPortable Keep portable flag.
-      * @return Created query.
       */
-     public CacheQuery<List<?>> createSqlFieldsQuery(String qry, boolean keepPortable) {
-         A.notNull(qry, "qry");
+     private class OffheapIteratorClosure
+         extends CX2<T2<Long, Integer>, T2<Long, Integer>, IgniteBiTuple<K, V>> {
+         /** */
+         private static final long serialVersionUID = 7410163202728985912L;
  
-         return new GridCacheQueryAdapter<>(cctx,
-             SQL_FIELDS,
-             null,
-             qry,
-             null,
-             null,
-             false,
-             keepPortable);
+         /** */
+         private IgniteBiPredicate<K, V> filter;
+ 
+         /** */
+         private boolean keepPortable;
+ 
+         /**
+          * @param filter Filter.
+          * @param keepPortable Keep portable flag.
+          */
+         private OffheapIteratorClosure(
+             @Nullable IgniteBiPredicate<K, V> filter,
+             boolean keepPortable) {
+             assert filter != null;
+ 
+             this.filter = filter;
+             this.keepPortable = keepPortable;
+         }
+ 
+         /** {@inheritDoc} */
+         @Nullable @Override public IgniteBiTuple<K, V> applyx(T2<Long, Integer> keyPtr,
+             T2<Long, Integer> valPtr)
+             throws IgniteCheckedException {
+             LazyOffheapEntry e = new LazyOffheapEntry(keyPtr, valPtr);
+ 
+             K key = (K)cctx.unwrapPortableIfNeeded(e.key(), keepPortable);
+             V val = (V)cctx.unwrapPortableIfNeeded(e.value(), keepPortable);
+ 
+             if (!filter.apply(key, val))
+                 return null;
+ 
+             return new IgniteBiTuple<>(e.key(), (V)cctx.unwrapTemporary(e.value()));
+         }
 -    }    /**
 -     * Creates user's SQL fields query for given clause. For more information refer to {@link CacheQuery}
 -     * documentation.
 -     *
 -     * @param qry Query.
 -     * @param keepPortable Keep portable flag.
 -     * @return Created query.
 -     */
 -    public CacheQuery<List<?>> createSqlFieldsQuery(String qry, boolean keepPortable) {
 -        A.notNull(qry, "qry");
 -
 -        return new GridCacheQueryAdapter<>(cctx,
 -            SQL_FIELDS,
 -            null,
 -            qry,
 -            null,
 -            null,
 -            false,
 -            keepPortable);
      }
  }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae09fa9f/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
index 9117863,df79232..afb599d
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
@@@ -92,13 -91,13 +92,10 @@@ public class GridRestProcessor extends 
  
      /** Default session timout. */
      private static final int DEFAULT_SES_TIMEOUT = 30_000;
--
--    /** Protocols. */
--    private final Collection<GridRestProtocol> protos = new ArrayList<>();
--
      /** Command handlers. */
      protected final Map<GridRestCommand, GridRestCommandHandler> handlers = new EnumMap<>(GridRestCommand.class);
--
++    /** Protocols. */
++    private final Collection<GridRestProtocol> protos = new ArrayList<>();
      /** */
      private final CountDownLatch startLatch = new CountDownLatch(1);
  
@@@ -132,6 -131,6 +129,85 @@@
      private final long sesTtl;
  
      /**
++     * @param ctx Context.
++     */
++    public GridRestProcessor(GridKernalContext ctx) {
++        super(ctx);
++
++        long sesExpTime0;
++        String sesExpTime = null;
++
++        try {
++            sesExpTime = System.getProperty(IgniteSystemProperties.IGNITE_REST_SESSION_TIMEOUT);
++
++            if (sesExpTime != null)
++                sesExpTime0 = Long.valueOf(sesExpTime) * 1000;
++            else
++                sesExpTime0 = DEFAULT_SES_TIMEOUT;
++        }
++        catch (NumberFormatException ignore) {
++            U.warn(log, "Failed parsing IGNITE_REST_SESSION_TIMEOUT system variable [IGNITE_REST_SESSION_TIMEOUT="
++                + sesExpTime + "]");
++
++            sesExpTime0 = DEFAULT_SES_TIMEOUT;
++        }
++
++        sesTtl = sesExpTime0;
++
++        sesTimeoutCheckerThread = new IgniteThread(ctx.gridName(), "session-timeout-worker",
++            new GridWorker(ctx.gridName(), "session-timeout-worker", log) {
++                @Override protected void body() throws InterruptedException {
++                    while (!isCancelled()) {
++                        Thread.sleep(SES_TIMEOUT_CHECK_DELAY);
++
++                        for (Map.Entry<UUID, Session> e : sesId2Ses.entrySet()) {
++                            Session ses = e.getValue();
++
++                            if (ses.isTimedOut(sesTtl)) {
++                                sesId2Ses.remove(ses.sesId, ses);
++
++                                clientId2SesId.remove(ses.clientId, ses.sesId);
++                            }
++                        }
++                    }
++                }
++            });
++    }
++
++/**
++     * Applies interceptor to a response object.
++     * Specially handler {@link Map} and {@link Collection} responses.
++     *
++     * @param obj Response object.
++     * @param interceptor Interceptor to apply.
++     * @return Intercepted object.
++     */
++    private static Object interceptSendObject(Object obj, ConnectorMessageInterceptor interceptor) {
++        if (obj instanceof Map) {
++            Map<Object, Object> original = (Map<Object, Object>)obj;
++
++            Map<Object, Object> m = new HashMap<>();
++
++            for (Map.Entry e : original.entrySet())
++                m.put(interceptor.onSend(e.getKey()), interceptor.onSend(e.getValue()));
++
++            return m;
++        }
++        else if (obj instanceof Collection) {
++            Collection<Object> original = (Collection<Object>)obj;
++
++            Collection<Object> c = new ArrayList<>(original.size());
++
++            for (Object e : original)
++                c.add(interceptor.onSend(e));
++
++            return c;
++        }
++        else
++            return interceptor.onSend(obj);
++    }
++
++    /**
       * @param req Request.
       * @return Future.
       */
@@@ -386,52 -385,52 +462,6 @@@
          }
      }
  
--    /**
--     * @param ctx Context.
--     */
--    public GridRestProcessor(GridKernalContext ctx) {
--        super(ctx);
--
--        long sesExpTime0;
--        String sesExpTime = null;
--
--        try {
--            sesExpTime = System.getProperty(IgniteSystemProperties.IGNITE_REST_SESSION_TIMEOUT);
--
--            if (sesExpTime != null)
--                sesExpTime0 = Long.valueOf(sesExpTime) * 1000;
--            else
--                sesExpTime0 = DEFAULT_SES_TIMEOUT;
--        }
--        catch (NumberFormatException ignore) {
--            U.warn(log, "Failed parsing IGNITE_REST_SESSION_TIMEOUT system variable [IGNITE_REST_SESSION_TIMEOUT="
--                + sesExpTime + "]");
--
--            sesExpTime0 = DEFAULT_SES_TIMEOUT;
--        }
--
--        sesTtl = sesExpTime0;
--
--        sesTimeoutCheckerThread = new IgniteThread(ctx.gridName(), "session-timeout-worker",
--            new GridWorker(ctx.gridName(), "session-timeout-worker", log) {
--                @Override protected void body() throws InterruptedException {
--                    while (!isCancelled()) {
--                        Thread.sleep(SES_TIMEOUT_CHECK_DELAY);
--
--                        for (Map.Entry<UUID, Session> e : sesId2Ses.entrySet()) {
--                            Session ses = e.getValue();
--
--                            if (ses.isTimedOut(sesTtl)) {
--                                sesId2Ses.remove(ses.sesId, ses);
--
--                                clientId2SesId.remove(ses.clientId, ses.sesId);
--                            }
--                        }
--                    }
--                }
--            });
--    }
--
      /** {@inheritDoc} */
      @Override public void start() throws IgniteCheckedException {
          if (isRestEnabled()) {
@@@ -516,7 -517,54 +546,7 @@@
          }
      }
  
 -    /**
 -     * Applies {@link ConnectorMessageInterceptor}
 -     * from {@link ConnectorConfiguration#getMessageInterceptor()} ()}
 -     * to all user parameters in the request.
 -     *
 -     * @param req Client request.
 -     */
 -    private void interceptRequest(GridRestRequest req) {
 -        ConnectorMessageInterceptor interceptor = config().getMessageInterceptor();
 -
 -        if (interceptor == null)
 -            return;
 -
 -        if (req instanceof GridRestCacheRequest) {
 -            GridRestCacheRequest req0 = (GridRestCacheRequest) req;
 -
 -            req0.key(interceptor.onReceive(req0.key()));
 -            req0.value(interceptor.onReceive(req0.value()));
 -            req0.value2(interceptor.onReceive(req0.value2()));
 -
 -            Map<Object, Object> oldVals = req0.values();
 -
 -            if (oldVals != null) {
 -                Map<Object, Object> newVals = U.newHashMap(oldVals.size());
 -
 -                for (Map.Entry<Object, Object> e : oldVals.entrySet())
 -                    newVals.put(interceptor.onReceive(e.getKey()), interceptor.onReceive(e.getValue()));
 -
 -                req0.values(U.sealMap(newVals));
 -            }
 -        }
 -        else if (req instanceof GridRestTaskRequest) {
 -            GridRestTaskRequest req0 = (GridRestTaskRequest) req;
 -
 -            List<Object> oldParams = req0.params();
 -
 -            if (oldParams != null) {
 -                Collection<Object> newParams = new ArrayList<>(oldParams.size());
 -
 -                for (Object o : oldParams)
 -                    newParams.add(interceptor.onReceive(o));
 -
 -                req0.params(U.sealList(newParams));
 -            }
 -        }
 -    }
 -
--    /**
++        /**
       * Applies {@link ConnectorMessageInterceptor}
       * from {@link ConnectorConfiguration#getMessageInterceptor()} ()}
       * to all user parameters in the request.
@@@ -562,8 -610,8 +592,7 @@@
              }
          }
      }
--
--    /**
++/**
       * Applies {@link ConnectorMessageInterceptor} from
       * {@link ConnectorConfiguration#getMessageInterceptor()}
       * to all user objects in the response.


Mime
View raw message