ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [4/7] ignite git commit: Merge master into ignite-3477
Date Tue, 14 Mar 2017 14:43:11 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index ba48e7a,a97b0fe..6bf96b0
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@@ -329,17 -329,20 +329,22 @@@ public class GridNearTxLocal extends Gr
          final boolean skipVals,
          final boolean needVer,
          boolean keepBinary,
 +        boolean recovery,
+         final ExpiryPolicy expiryPlc,
          final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
      ) {
+         IgniteCacheExpiryPolicy expiryPlc0 = optimistic() ?
+             accessPolicy(cacheCtx, keys) :
+             cacheCtx.cache().expiryPolicy(expiryPlc);
+ 
          if (cacheCtx.isNear()) {
              return cacheCtx.nearTx().txLoadAsync(this,
                  topVer,
                  keys,
                  readThrough,
                  /*deserializeBinary*/false,
 +                recovery,
-                 accessPolicy(cacheCtx, keys),
+                 expiryPlc0,
                  skipVals,
                  needVer).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() {
                  @Override public Void apply(IgniteInternalFuture<Map<Object, Object>> f) {
@@@ -402,8 -404,7 +407,8 @@@
                      CU.subjectId(this, cctx),
                      resolveTaskName(),
                      /*deserializeBinary*/false,
 +                    recovery,
-                     accessPolicy(cacheCtx, keys),
+                     expiryPlc0,
                      skipVals,
                      /*can remap*/true,
                      needVer,
@@@ -435,9 -436,9 +440,10 @@@
                  async,
                  keys,
                  skipVals,
--                keepBinary,
                  needVer,
++                keepBinary,
 +                recovery,
+                 expiryPlc,
                  c);
          }
      }

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 7aaa476,f86df2f..da92692
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@@ -407,12 -412,13 +408,11 @@@ public class GridLocalAtomicCache<K, V
  
                      if (entry != null) {
                          CacheObject v;
-                         GridCacheVersion ver;
  
                          if (needVer) {
-                             T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                             EntryGetResult res = entry.innerGetVersioned(
                                  null,
                                  null,
 -                                /*swap*/swapOrOffheap,
 -                                /*unmarshal*/true,
                                  /**update-metrics*/false,
                                  /*event*/!skipVals,
                                  subjId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 6731179,7af3485..da5b326
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@@ -365,12 -380,22 +377,21 @@@ public abstract class GridCacheQueryMan
       * @param key Key.
       * @throws IgniteCheckedException If failed.
       */
 -    public void onSwap(CacheObject key) throws IgniteCheckedException {
 +    public void onSwap(KeyCacheObject key, int partId) throws IgniteCheckedException {
+         if(!enabled)
+             return;
 -
          if (!enterBusy())
              return; // Ignore index update when node is stopping.
  
          try {
-             qryProc.onSwap(space, key, partId);
+             if (isIndexingSpiEnabled()) {
+                 Object key0 = unwrapIfNeeded(key, cctx.cacheObjectContext());
+ 
+                 cctx.kernalContext().indexing().onSwap(space, key0);
+             }
+ 
+             if(qryProcEnabled)
 -                qryProc.onSwap(space, key);
++                qryProc.onSwap(space, key, partId);
          }
          finally {
              leaveBusy();
@@@ -384,12 -417,26 +413,25 @@@
       * @param val Value
       * @throws IgniteCheckedException If failed.
       */
 -    public void onUnswap(CacheObject key, CacheObject val) throws IgniteCheckedException {
 +    public void onUnswap(KeyCacheObject key, int partId, CacheObject val) throws IgniteCheckedException {
+         if(!enabled)
+             return;
 -
          if (!enterBusy())
              return; // Ignore index update when node is stopping.
  
          try {
-             qryProc.onUnswap(space, key, partId, val);
+             if (isIndexingSpiEnabled()) {
+                 CacheObjectContext coctx = cctx.cacheObjectContext();
+ 
+                 Object key0 = unwrapIfNeeded(key, coctx);
+ 
+                 Object val0 = unwrapIfNeeded(val, coctx);
+ 
+                 cctx.kernalContext().indexing().onUnswap(space, key0, val0);
+             }
+ 
+             if(qryProcEnabled)
 -                qryProc.onUnswap(space, key, val);
++                qryProc.onUnswap(space, key, partId, val);
          }
          finally {
              leaveBusy();
@@@ -434,10 -470,21 +476,21 @@@
              return; // No-op.
  
          if (!enterBusy())
 -            return; // Ignore index update when node is stopping.
 +            throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
  
          try {
-             qryProc.store(space, key, partId, prevVal, prevVer, val, ver, expirationTime, link);
+             if (isIndexingSpiEnabled()) {
+                 CacheObjectContext coctx = cctx.cacheObjectContext();
+ 
+                 Object key0 = unwrapIfNeeded(key, coctx);
+ 
+                 Object val0 = unwrapIfNeeded(val, coctx);
+ 
+                 cctx.kernalContext().indexing().store(space, key0, val0, expirationTime);
+             }
+ 
+             if(qryProcEnabled)
 -                qryProc.store(space, key, val, CU.versionToBytes(ver), expirationTime);
++                qryProc.store(space, key, partId, prevVal, prevVer, val, ver, expirationTime, link);
          }
          finally {
              invalidateResultCache();
@@@ -464,7 -509,14 +517,14 @@@
              return; // Ignore index update when node is stopping.
  
          try {
-             qryProc.remove(space, key, partId, val, ver);
+             if (isIndexingSpiEnabled()) {
+                 Object key0 = unwrapIfNeeded(key, cctx.cacheObjectContext());
+ 
+                 cctx.kernalContext().indexing().remove(space, key0);
+             }
+ 
+             if(qryProcEnabled)
 -                qryProc.remove(space, key, val);
++                qryProc.remove(space, key, partId, val, ver);
          }
          finally {
              invalidateResultCache();
@@@ -2966,32 -3582,40 +3036,36 @@@
          private void advance() {
              IgniteBiTuple<K, V> next0 = null;
  
 -            while (entryIt.hasNext()) {
 -                next0 = null;
 +            while (it.hasNext()) {
 +                CacheDataRow row = it.next();
  
 -                GridCacheEntryEx entry = entryIt.next();
 +                KeyCacheObject key = row.key();
  
+                 if (entry.deleted())
+                     continue;
+ 
+                 KeyCacheObject key = entry.key();
                  CacheObject val;
  
 -                try {
 -                    if (heapOnly)
 -                        val = entry.peek(true, false, false, expiryPlc);
 -                    else
 -                        val = value(entry, entry.key());
 -                }
 -                catch (GridCacheEntryRemovedException ignore) {
 -                    assert heapOnly;
 -
 -                    continue;
 -                }
 -                catch (IgniteCheckedException e) {
 -                    if (log.isDebugEnabled())
 -                        log.debug("Failed to peek value: " + e);
 +                if (expiryPlc != null) {
 +                    try {
 +                        val = value(key);
 +                    }
 +                    catch (IgniteCheckedException e) {
 +                        if (log.isDebugEnabled())
 +                            log.debug("Failed to peek value: " + e);
  
 -                    val = null;
 -                }
 +                        val = null;
 +                    }
  
 -                if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) {
 -                    dht.sendTtlUpdateRequest(expiryPlc);
 +                    if (dht != null && expiryPlc.readyToFlush(100)) {
 +                        dht.sendTtlUpdateRequest(expiryPlc);
  
 -                    expiryPlc = cctx.cache().expiryPolicy(plc);
 +                        expiryPlc = cctx.cache().expiryPolicy(plc);
 +                    }
                  }
 +                else
 +                    val = row.value();
  
                  if (val != null) {
                      boolean keepBinary0 = !locNode || keepBinary;
@@@ -3043,22 -3670,20 +3117,18 @@@
           * @return Value.
           * @throws IgniteCheckedException If failed to peek value.
           */
 -        private CacheObject value(GridCacheEntryEx entry, KeyCacheObject key) throws IgniteCheckedException {
 +        private CacheObject value(KeyCacheObject key) throws IgniteCheckedException {
-             GridCacheEntryEx entry = null;
- 
-             try {
-                 entry = cache.entryEx(key);
+             while (true) {
+                 try {
 -                    if (entry == null)
 -                        entry = cache.entryEx(key);
++                    GridCacheEntryEx entry = cache.entryEx(key);
  
-                 entry.unswap();
 -                    if (expiryPlc != null)
 -                        entry.unswap();
++                    entry.unswap();
  
-                 return entry.peek(true, false, topVer, expiryPlc);
-             }
-             catch (GridCacheEntryRemovedException ignore) {
-                 return null;
-             }
-             finally {
-                 if (entry != null)
-                     cctx.evicts().touch(entry, topVer);
 -                    return entry.peek(true, true, true, topVer, expiryPlc);
++                    return entry.peek(true, true, topVer, expiryPlc);
+                 }
+                 catch (GridCacheEntryRemovedException ignore) {
 -                    entry = null;
++                    // No-op.
+                 }
              }
          }
      }

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 41eec6a,b3f0684..a540fdd
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@@ -864,21 -882,21 +885,22 @@@ public class CacheContinuousQueryHandle
                  GridCacheAffinityManager aff = cctx.affinity();
  
                  if (initUpdCntrsPerNode != null) {
-                     for (ClusterNode node : aff.nodes(partId, initTopVer)) {
+                     for (ClusterNode node : aff.nodesByPartition(partId, initTopVer)) {
 -                        Map<Integer, Long> map = initUpdCntrsPerNode.get(node.id());
 +                        Map<Integer, T2<Long, Long>> map = initUpdCntrsPerNode.get(node.id());
  
                          if (map != null) {
 -                            partCntr = map.get(partId);
 +                            partCntrs = map.get(partId);
  
                              break;
                          }
                      }
                  }
                  else if (initUpdCntrs != null)
 -                    partCntr = initUpdCntrs.get(partId);
 +                    partCntrs = initUpdCntrs.get(partId);
              }
  
-             rec = new PartitionRecovery(ctx.log(getClass()), initTopVer0, partCntrs != null ? partCntrs.get2() : null);
 -            rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), initTopVer0, partCntr);
++            rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), initTopVer0,
++                partCntrs != null ? partCntrs.get2() : null);
  
              PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index ba2ab3c,90a68ad..9f3463f
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@@ -1502,20 -1490,19 +1502,20 @@@ public class IgniteTxHandler 
                                          /*expiryPlc*/null,
                                          /*keepBinary*/true);
  
 -                                    if (val == null)
 -                                        val = cacheCtx.toCacheObject(cacheCtx.store().load(null, entry.key()));
 +                                        if (val == null)
 +                                            val = cacheCtx.toCacheObject(cacheCtx.store().load(null, entry.key()));
  
 -                                    if (val != null)
 -                                        entry.readValue(val);
 +                                        if (val != null)
 +                                            entry.readValue(val);
  
-                                         break;
-                                     }
-                                     catch (GridCacheEntryRemovedException ignore) {
-                                         if (log.isDebugEnabled())
-                                             log.debug("Got entry removed exception, will retry: " + entry.txKey());
+                                     break;
+                                 }
+                                 catch (GridCacheEntryRemovedException ignored) {
+                                     if (log.isDebugEnabled())
+                                         log.debug("Got entry removed exception, will retry: " + entry.txKey());
  
 -                                    entry.cached(cacheCtx.cache().entryEx(entry.key(), req.topologyVersion()));
 +                                        entry.cached(cacheCtx.cache().entryEx(entry.key(), req.topologyVersion()));
 +                                    }
                                  }
                              }
                          }

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index f05fdf7,bd806aa..f334b84
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@@ -403,7 -400,7 +404,8 @@@ public abstract class IgniteTxLocalAdap
          boolean skipVals,
          boolean needVer,
          boolean keepBinary,
 +        boolean recovery,
+         final ExpiryPolicy expiryPlc,
          final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
      ) {
          assert cacheCtx.isLocal() : cacheCtx.name();
@@@ -431,9 -430,11 +435,9 @@@
                          continue;
  
                      try {
-                         T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                         EntryGetResult res = entry.innerGetVersioned(
                              null,
                              this,
 -                            /*readSwap*/true,
 -                            /*unmarshal*/true,
                              /*update-metrics*/!skipVals,
                              /*event*/!skipVals,
                              CU.subjectId(this, cctx),
@@@ -1246,9 -1224,11 +1256,9 @@@
                                      F.first(txEntry.entryProcessors()) : null;
  
                              if (needVer) {
-                                 T2<CacheObject, GridCacheVersion> res = txEntry.cached().innerGetVersioned(
+                                 getRes = txEntry.cached().innerGetVersioned(
                                      null,
                                      this,
 -                                    /*swap*/true,
 -                                    /*unmarshal*/true,
                                      /*update-metrics*/true,
                                      /*event*/!skipVals,
                                      CU.subjectId(this, cctx),
@@@ -1499,7 -1497,7 +1522,8 @@@
                  skipVals,
                  needReadVer,
                  !deserializeBinary,
 +                recovery,
+                 expiryPlc,
                  new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
                      @Override public void apply(KeyCacheObject key, Object val, GridCacheVersion loadVer) {
                          if (isRollbackOnly()) {
@@@ -1675,10 -1681,12 +1708,10 @@@
                                              F.first(txEntry.entryProcessors()) : null;
  
                                      if (needVer) {
-                                         T2<CacheObject, GridCacheVersion> res = cached.innerGetVersioned(
+                                         getRes = cached.innerGetVersioned(
                                              null,
                                              IgniteTxLocalAdapter.this,
 -                                            /*swap*/cacheCtx.isSwapOrOffheapEnabled(),
 -                                            /*unmarshal*/true,
 -                                            /*update-metrics*/true,
 +                                            /**update-metrics*/true,
                                              /*event*/!skipVals,
                                              CU.subjectId(IgniteTxLocalAdapter.this, cctx),
                                              transformClo,
@@@ -2041,7 -2055,7 +2081,8 @@@
                      /*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
                      retval,
                      keepBinary,
-                     recovery);
++                    recovery,
+                     expiryPlc);
              }
  
              return new GridFinishedFuture<>();
@@@ -2212,7 -2225,7 +2253,8 @@@
                      /*read through*/(invokeMap != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
                      retval,
                      keepBinary,
-                     recovery);
++                    recovery,
+                     expiryPlc);
              }
  
              return new GridFinishedFuture<>();
@@@ -2232,8 -2245,7 +2274,9 @@@
       * @param hasFilters {@code True} if filters not empty.
       * @param readThrough Read through flag.
       * @param retval Return value flag.
 +     * @param keepBinary Keep binary flag.
 +     * @param recovery Recovery flag.
+      * @param expiryPlc Expiry policy.
       * @return Load future.
       */
      private IgniteInternalFuture<Void> loadMissing(
@@@ -2248,8 -2260,7 +2291,8 @@@
          final boolean readThrough,
          final boolean retval,
          final boolean keepBinary,
-         final boolean recovery
-     ) {
++        final boolean recovery,
+         final ExpiryPolicy expiryPlc) {
          GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c =
              new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
                  @Override public void apply(KeyCacheObject key,
@@@ -2323,7 -2334,7 +2366,8 @@@
              /*skipVals*/singleRmv,
              needReadVer,
              keepBinary,
 +            recovery,
+             expiryPlc,
              c);
      }
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index d6b09e4,f5687a0..cf1e7e2
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@@ -192,6 -194,6 +194,7 @@@ public interface IgniteTxLocalEx extend
          boolean skipVals,
          boolean needVer,
          boolean keepBinary,
 +        boolean recovery,
+         final ExpiryPolicy expiryPlc,
          GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c);
  }

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 9012214,68479a6..df2f7d9
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@@ -1925,59 -1926,24 +1923,59 @@@ public class DataStreamerImpl<K, V> imp
  
              ExpiryPolicy plc = cctx.expiry();
  
 -            for (Entry<KeyCacheObject, CacheObject> e : entries) {
 -                try {
 -                    e.getKey().finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader());
 +            Collection<Integer> reservedParts = new HashSet<>();
 +            Collection<Integer> ignoredParts = new HashSet<>();
 +
 +            try {
 +                for (Entry<KeyCacheObject, CacheObject> e : entries) {
 +                    cctx.shared().database().checkpointReadLock();
  
 -                    GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer);
 +                    try {
 +                        e.getKey().finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader());
  
 -                    if (plc != null) {
 -                        ttl = CU.toTtl(plc.getExpiryForCreation());
 +                        if (!cctx.isLocal()) {
 +                            int p = cctx.affinity().partition(e.getKey());
  
 -                        if (ttl == CU.TTL_ZERO)
 -                            continue;
 -                        else if (ttl == CU.TTL_NOT_CHANGED)
 -                            ttl = 0;
 +                            if (ignoredParts.contains(p))
 +                                continue;
  
 -                        expiryTime = CU.toExpireTime(ttl);
 -                    }
 +                            if (!reservedParts.contains(p)) {
 +                                GridDhtLocalPartition part = cctx.topology().localPartition(p, topVer, true);
 +
 +                                if (!part.reserve()) {
 +                                    ignoredParts.add(p);
 +
 +                                    continue;
 +                                }
 +                                else {
 +                                    // We must not allow to read from RENTING partitions.
 +                                    if (part.state() == GridDhtPartitionState.RENTING) {
 +                                        part.release();
 +
 +                                        ignoredParts.add(p);
 +
 +                                        continue;
 +                                    }
 +
 +                                    reservedParts.add(p);
 +                                }
 +                            }
 +                        }
 +
 +                        GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer);
 +
 +                        if (plc != null) {
 +                            ttl = CU.toTtl(plc.getExpiryForCreation());
 +
 +                            if (ttl == CU.TTL_ZERO)
 +                                continue;
 +                            else if (ttl == CU.TTL_NOT_CHANGED)
 +                                ttl = 0;
 +
 +                            expiryTime = CU.toExpireTime(ttl);
 +                        }
  
-                     boolean primary = cctx.affinity().primary(cctx.localNode(), entry.key(), topVer);
+                     boolean primary = cctx.affinity().primaryByKey(cctx.localNode(), entry.key(), topVer);
  
                      entry.initialValue(e.getValue(),
                          ver,

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 03a6f9b,fddb8df..3607a5b
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@@ -980,39 -867,13 +954,14 @@@ public class GridQueryProcessor extend
                          if (typeDesc == null || !typeDesc.registered())
                              throw new CacheException("Failed to find SQL table for type: " + type);
  
-                         final GridCloseableIterator<IgniteBiTuple<K, V>> i = idx.queryLocalSql(
-                             space,
-                             sqlQry,
-                             F.asList(params),
-                             typeDesc,
-                             idx.backupFilter(requestTopVer.get(), null));
+                         qry.setType(typeDesc.name());
  
                          sendQueryExecutedEvent(
-                             sqlQry,
-                             params,
-                             space);
- 
-                         return new ClIter<Cache.Entry<K, V>>() {
-                             @Override public void close() throws Exception {
-                                 i.close();
-                             }
- 
-                             @Override public boolean hasNext() {
-                                 return i.hasNext();
-                             }
- 
-                             @Override public Cache.Entry<K, V> next() {
-                                 IgniteBiTuple<K, V> t = i.next();
- 
-                                 return new CacheEntryImpl<>(
-                                     (K)cctx.unwrapBinaryIfNeeded(t.getKey(), keepBinary, false),
-                                     (V)cctx.unwrapBinaryIfNeeded(t.getValue(), keepBinary, false));
-                             }
- 
-                             @Override public void remove() {
-                                 throw new UnsupportedOperationException();
-                             }
-                         };
+                             qry.getSql(),
 -                            qry.getArgs());
++                            qry.getArgs(),
++                            cctx.name());
+ 
+                         return idx.queryLocalSql(cctx, qry, idx.backupFilter(requestTopVer.get(), null), keepBinary);
                      }
                  }, true);
          }
@@@ -1093,34 -994,26 +1082,26 @@@
              throw new IllegalStateException("Failed to execute query (grid is stopping).");
  
          try {
-             final boolean keepBinary = cctx.keepBinary();
- 
              return executeQuery(GridCacheQueryType.SQL_FIELDS, qry.getSql(), cctx, new IgniteOutClosureX<QueryCursor<List<?>>>() {
                  @Override public QueryCursor<List<?>> applyx() throws IgniteCheckedException {
-                     final String space = cctx.name();
-                     final String sql = qry.getSql();
-                     final Object[] args = qry.getArgs();
-                     final GridQueryCancel cancel = new GridQueryCancel();
+                     GridQueryCancel cancel = new GridQueryCancel();
  
-                     final GridQueryFieldsResult res = idx.queryLocalSqlFields(space, sql, F.asList(args),
-                         idx.backupFilter(requestTopVer.get(), null), qry.isEnforceJoinOrder(), qry.getTimeout(), cancel);
+                     final QueryCursor<List<?>> cursor = idx.queryLocalSqlFields(cctx, qry,
+                         idx.backupFilter(requestTopVer.get(), null), cancel);
  
-                     QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() {
+                     return new QueryCursorImpl<List<?>>(new Iterable<List<?>>() {
                          @Override public Iterator<List<?>> iterator() {
-                             try {
-                                 sendQueryExecutedEvent(sql, args, space);
- 
-                                 return new GridQueryCacheObjectsIterator(res.iterator(), cctx, keepBinary);
-                             }
-                             catch (IgniteCheckedException e) {
-                                 throw new IgniteException(e);
-                             }
-                         }
-                     }, cancel);
 -                            sendQueryExecutedEvent(qry.getSql(), qry.getArgs());
++                            sendQueryExecutedEvent(qry.getSql(), qry.getArgs(), cctx.name());
  
-                     cursor.fieldsMeta(res.metaData());
- 
-                     return cursor;
+                             return cursor.iterator();
+                         }
+                     }, cancel) {
+                         @Override public List<GridQueryFieldMetadata> fieldsMeta() {
+                             if (cursor instanceof QueryCursorImpl)
+                                 return ((QueryCursorImpl)cursor).fieldsMeta();
+                             return super.fieldsMeta();
+                         }
+                     };
                  }
              }, true);
          }
@@@ -1161,15 -1043,7 +1131,14 @@@
              throw new IllegalStateException("Failed to remove from index (grid is stopping).");
  
          try {
-             if (coctx == null)
-                 coctx = cacheObjectContext(space);
 -            idx.remove(space, key, val);
++            CacheObjectContext coctx = cacheObjectContext(space);
 +
 +            TypeDescriptor desc = typeByValue(coctx, key, val, false);
 +
 +            if (desc == null)
 +                return;
 +
 +            idx.remove(space, desc, key, partId, val, ver);
          }
          finally {
              busyLock.leaveBusy();

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 914c3a3,99146aa..13cbf1d
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@@ -305,58 -308,54 +305,59 @@@ public class GridServiceProcessor exten
  
      /** {@inheritDoc} */
      @Override public void onKernalStop(boolean cancel) {
 -        if (ctx.isDaemon())
 -            return;
 -
          busyLock.block();
  
 -        if (!ctx.clientNode())
 -            ctx.event().removeDiscoveryEventListener(topLsnr);
 -
 -        Collection<ServiceContextImpl> ctxs = new ArrayList<>();
 +        try {
 +            if (ctx.isDaemon())
 +                return;
  
 -        synchronized (locSvcs) {
 -            for (Collection<ServiceContextImpl> ctxs0 : locSvcs.values())
 -                ctxs.addAll(ctxs0);
 -        }
 +            if (!ctx.clientNode())
-                 ctx.event().removeLocalEventListener(topLsnr);
++                ctx.event().removeDiscoveryEventListener(topLsnr);
  
 -        for (ServiceContextImpl ctx : ctxs) {
 -            ctx.setCancelled(true);
 +            Collection<ServiceContextImpl> ctxs = new ArrayList<>();
  
 -            Service svc = ctx.service();
 +            synchronized (locSvcs) {
 +                for (Collection<ServiceContextImpl> ctxs0 : locSvcs.values())
 +                    ctxs.addAll(ctxs0);
 +            }
  
 -            if (svc != null)
 -                svc.cancel(ctx);
 +            for (ServiceContextImpl ctx : ctxs) {
 +                ctx.setCancelled(true);
  
 -            ctx.executor().shutdownNow();
 -        }
 +                Service svc = ctx.service();
  
 -        for (ServiceContextImpl ctx : ctxs) {
 -            try {
 -                if (log.isInfoEnabled() && !ctxs.isEmpty())
 -                    log.info("Shutting down distributed service [name=" + ctx.name() + ", execId8=" +
 -                        U.id8(ctx.executionId()) + ']');
 +                if (svc != null)
 +                    svc.cancel(ctx);
  
 -                ctx.executor().awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
 +                ctx.executor().shutdownNow();
              }
 -            catch (InterruptedException ignore) {
 -                Thread.currentThread().interrupt();
  
 -                U.error(log, "Got interrupted while waiting for service to shutdown (will continue stopping node): " +
 -                    ctx.name());
 +            for (ServiceContextImpl ctx : ctxs) {
 +                try {
 +                    if (log.isInfoEnabled() && !ctxs.isEmpty())
 +                        log.info("Shutting down distributed service [name=" + ctx.name() + ", execId8=" +
 +                            U.id8(ctx.executionId()) + ']');
 +
 +                    ctx.executor().awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
 +                }
 +                catch (InterruptedException ignore) {
 +                    Thread.currentThread().interrupt();
 +
 +                    U.error(log, "Got interrupted while waiting for service to shutdown (will continue stopping node): " +
 +                        ctx.name());
 +                }
              }
 -        }
  
 -        U.shutdownNow(GridServiceProcessor.class, depExe, log);
 +            U.shutdownNow(GridServiceProcessor.class, depExe, log);
  
 -        Exception err = new IgniteCheckedException("Operation has been cancelled (node is stopping).");
 +            Exception err = new IgniteCheckedException("Operation has been cancelled (node is stopping).");
  
 -        cancelFutures(depFuts, err);
 -        cancelFutures(undepFuts, err);
 +            cancelFutures(depFuts, err);
 +            cancelFutures(undepFuts, err);
-         }finally {
++        }
++        finally {
 +            busyLock.unblock();
 +        }
  
          if (log.isDebugEnabled())
              log.debug("Stopped service processor.");

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 1be9f75,74e4450..e7951f9
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@@ -74,7 -74,7 +74,8 @@@ import java.nio.channels.SelectionKey
  import java.nio.channels.Selector;
  import java.nio.charset.Charset;
  import java.nio.file.Files;
 +import java.nio.file.Path;
+ import java.nio.file.Paths;
  import java.security.AccessController;
  import java.security.KeyManagementException;
  import java.security.MessageDigest;

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
index 9a25aa8,a64ec6d..e4c356a
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
@@@ -285,7 -294,11 +297,10 @@@ public class VisorGatewayTask implement
          }
  
          /** {@inheritDoc} */
 -        @SuppressWarnings("unchecked")
          @Override public Object execute() throws IgniteException {
+             if (fut != null)
+                 return fut.get();
+ 
              String nidsArg = argument(0);
              String taskName = argument(1);
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
index cc1c678,1ac90ad..fbc617b
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
@@@ -131,9 -131,8 +131,9 @@@ public class VisorQueryJob extends Viso
                  if (scanWithFilter) {
                      boolean caseSensitive = qryTxt.startsWith(SCAN_CACHE_WITH_FILTER_CASE_SENSITIVE);
  
-                     String ptrn = qryTxt.substring(caseSensitive
 -                    String ptrn = qryTxt.substring(
 -                        caseSensitive ? SCAN_CACHE_WITH_FILTER_CASE_SENSITIVE.length() : SCAN_CACHE_WITH_FILTER.length());
++                    String ptrn =  qryTxt.substring(caseSensitive
 +                        ? SCAN_CACHE_WITH_FILTER_CASE_SENSITIVE.length()
 +                        : SCAN_CACHE_WITH_FILTER.length());
  
                      filter = new VisorQueryScanSubstringFilter(caseSensitive, ptrn);
                  }
@@@ -161,7 -160,8 +161,8 @@@
                  SqlFieldsQuery qry = new SqlFieldsQuery(arg.queryTxt());
                  qry.setPageSize(arg.pageSize());
                  qry.setLocal(arg.local());
 -                qry.setDistributedJoins(arg instanceof VisorQueryArgV2 && ((VisorQueryArgV2)arg).distributedJoins());
 -                qry.setEnforceJoinOrder(arg instanceof VisorQueryArgV3 && ((VisorQueryArgV3)arg).enforceJoinOrder());
 +                qry.setDistributedJoins(arg.distributedJoins());
++                qry.setEnforceJoinOrder(arg.enforceJoinOrder());
  
                  long start = U.currentTimeMillis();
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
index 911e832,8e4590e..e1cd7cf
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
@@@ -52,8 -53,9 +52,8 @@@ import org.apache.ignite.IgniteLogger
  import org.apache.ignite.cache.eviction.EvictionPolicy;
  import org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicyMBean;
  import org.apache.ignite.cache.eviction.lru.LruEvictionPolicyMBean;
- import org.apache.ignite.cache.eviction.random.RandomEvictionPolicyMBean;
+ import org.apache.ignite.cache.eviction.sorted.SortedEvictionPolicyMBean;
  import org.apache.ignite.cluster.ClusterNode;
 -import org.apache.ignite.events.DiscoveryEvent;
  import org.apache.ignite.events.Event;
  import org.apache.ignite.internal.processors.igfs.IgfsEx;
  import org.apache.ignite.internal.util.typedef.F;

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 2c86322,ba75dcb..ed356fd
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@@ -5869,14 -5865,15 +5865,14 @@@ public abstract class GridCacheAbstract
          @Override public void run(int idx) throws Exception {
              GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache().context();
  
 -            if (ctx.cache().configuration().getMemoryMode() == OFFHEAP_TIERED)
 -                return;
 -
              int size = 0;
  
 +            if (ctx.isNear())
 +                ctx = ctx.near().dht().context();
 +
              for (String key : keys) {
-                 if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) {
+                 if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx())) {
 -                    GridCacheEntryEx e =
 -                        ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
 +                    GridCacheEntryEx e = ctx.cache().entryEx(key);
  
                      assert e != null : "Entry is null [idx=" + idx + ", key=" + key + ", ctx=" + ctx + ']';
                      assert !e.deleted() : "Entry is deleted: " + e;

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index be3933f,1ecc2d1..49f0c98
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@@ -411,9 -417,31 +410,29 @@@ public class GridCacheTestEntryEx exten
      }
  
      /** @inheritDoc */
-     @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned(
+     @Override public void clearReserveForLoad(GridCacheVersion ver) {
+         assert false;
+     }
+ 
+     /** @inheritDoc */
+     @Override public EntryGetResult innerGetAndReserveForLoad(
+         boolean readSwap,
+         boolean updateMetrics,
+         boolean evt,
+         UUID subjId,
+         String taskName,
+         @Nullable IgniteCacheExpiryPolicy expiryPlc,
+         boolean keepBinary,
+         @Nullable ReaderArguments args) throws IgniteCheckedException, GridCacheEntryRemovedException {
+         assert false;
+ 
+         return null;
+     }
+ 
+     /** @inheritDoc */
+     @Nullable @Override public EntryGetResult innerGetVersioned(
          @Nullable GridCacheVersion ver,
          IgniteInternalTx tx,
 -        boolean readSwap,
 -        boolean unmarshal,
          boolean updateMetrics,
          boolean evt,
          UUID subjId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
index b746883,e47a18d..a78d8ef
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
@@@ -5553,12 -5545,10 +5551,12 @@@ public class IgniteCacheConfigVariation
  
              int size = 0;
  
 +            if (ctx.isNear())
 +                ctx = ctx.near().dht().context();
 +
              for (String key : keys) {
-                 if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) {
+                 if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx())) {
 -                    GridCacheEntryEx e =
 -                        ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
 +                    GridCacheEntryEx e = ctx.cache().entryEx(key);
  
                      assert e != null : "Entry is null [idx=" + idx + ", key=" + key + ", ctx=" + ctx + ']';
                      assert !e.deleted() : "Entry is deleted: " + e;

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java
index 4f02fa2,aeca2fb..c0d7745
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java
@@@ -364,14 -363,10 +364,14 @@@ public abstract class GridCacheAbstract
  
          GridCacheAffinityManager aff = cctx.affinity();
  
 +        CachePeekMode[] modes = new CachePeekMode[]{CachePeekMode.ALL};
 +
          for (int i = 0; i < gridCount(); i++) {
 -            for (GridCacheEntryEx e : ((IgniteKernal)grid(i)).context().cache().internalCache(cctx.name()).allEntries()) {
 -                if (aff.primaryByKey(grid(i).localNode(), e.key(), AffinityTopologyVersion.NONE)
 -                    && e.key().value(cctx.cacheObjectContext(), false) instanceof GridCacheQueueHeaderKey)
 +            for (Cache.Entry e : grid(i).context().cache().internalCache(cctx.name()).localEntries(modes)) {
 +                Object key = e.getKey();
 +
-                 if (aff.primary(grid(i).localNode(), key, AffinityTopologyVersion.NONE)
++                if (aff.primaryByKey(grid(i).localNode(), key, AffinityTopologyVersion.NONE)
 +                    && key instanceof GridCacheQueueHeaderKey)
                      return i;
              }
          }

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCrossCacheTxFailoverTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
index 177c878,3fd4dd8..fd310c4
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
@@@ -384,13 -384,17 +384,13 @@@ public class GridCacheAtomicInvalidPart
  
                      GridCacheEntryEx entry = null;
  
 -                    if (memMode == TestMemoryMode.HEAP)
 -                        entry = c.peekEx(k);
 -                    else {
 -                        try {
 -                            entry = c.entryEx(k);
 +                    try {
 +                        entry = c.entryEx(k);
  
 -                            entry.unswap();
 -                        }
 -                        catch (GridDhtInvalidPartitionException ignored) {
 -                            // Skip key.
 -                        }
 +                        entry.unswap();
 +                    }
-                     catch (GridDhtInvalidPartitionException e) {
++                    catch (GridDhtInvalidPartitionException ignored) {
 +                        // Skip key.
                      }
  
                      for (int r = 0; r < 10; r++) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 04004b7,7e3ff5c..e484f1c
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@@ -237,10 -237,10 +237,10 @@@ public class GridCacheRebalancingSyncSe
  
          int waitMinorVer = ignite.configuration().isLateAffinityAssignment() ? 1 : 0;
  
-         waitForRebalancing(0, new AffinityTopologyVersion(2, waitMinorVer));
-         waitForRebalancing(1, new AffinityTopologyVersion(2, waitMinorVer));
+         waitForRebalancing(0, 2, waitMinorVer);
+         waitForRebalancing(1, 2, waitMinorVer);
  
 -        awaitPartitionMapExchange(true, true, null);
 +        awaitPartitionMapExchange(true, true, null, true);
  
          checkPartitionMapExchangeFinished();
  
@@@ -258,10 -258,10 +258,10 @@@
  
          startGrid(2);
  
-         waitForRebalancing(1, new AffinityTopologyVersion(4, waitMinorVer));
-         waitForRebalancing(2, new AffinityTopologyVersion(4, waitMinorVer));
+         waitForRebalancing(1, 4, waitMinorVer);
+         waitForRebalancing(2, 4, waitMinorVer);
  
 -        awaitPartitionMapExchange(true, true, null);
 +        awaitPartitionMapExchange(true, true, null, true);
  
          checkPartitionMapExchangeFinished();
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/EvictionAbstractTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index 4ceb1b6,8f08ea9..806fc8e
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@@ -36,11 -36,16 +36,14 @@@ import javax.cache.expiry.ExpiryPolicy
  import javax.cache.expiry.ModifiedExpiryPolicy;
  import javax.cache.processor.EntryProcessor;
  import javax.cache.processor.MutableEntry;
+ import org.apache.ignite.Ignite;
  import org.apache.ignite.IgniteCache;
 -import org.apache.ignite.IgniteCheckedException;
  import org.apache.ignite.cache.CacheMemoryMode;
+ import org.apache.ignite.cache.CachePeekMode;
  import org.apache.ignite.cluster.ClusterNode;
  import org.apache.ignite.configuration.CacheConfiguration;
+ import org.apache.ignite.configuration.IgniteConfiguration;
  import org.apache.ignite.configuration.NearCacheConfiguration;
 -import org.apache.ignite.internal.IgniteInterruptedCheckedException;
  import org.apache.ignite.internal.IgniteKernal;
  import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
  import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@@ -49,7 -54,9 +52,8 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
  import org.apache.ignite.internal.util.lang.GridAbsPredicate;
  import org.apache.ignite.internal.util.typedef.F;
 -import org.apache.ignite.internal.util.typedef.PAX;
  import org.apache.ignite.internal.util.typedef.internal.S;
+ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
  import org.apache.ignite.testframework.GridTestUtils;
  import org.apache.ignite.transactions.Transaction;
  import org.apache.ignite.transactions.TransactionConcurrency;

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyWithStoreAbstractTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyWithStoreAbstractTest.java
index fe53fc7,1f6ec2d..b5ca2de
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyWithStoreAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyWithStoreAbstractTest.java
@@@ -36,9 -40,11 +40,12 @@@ import org.apache.ignite.internal.Ignit
  import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
  import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
  import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest;
 +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
  import org.apache.ignite.internal.util.typedef.F;
  import org.apache.ignite.internal.util.typedef.internal.U;
+ import org.apache.ignite.transactions.Transaction;
+ import org.apache.ignite.transactions.TransactionConcurrency;
+ import org.apache.ignite.transactions.TransactionIsolation;
  
  /**
   *

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 480928c,3bdf0bd..f0b6621
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@@ -61,7 -58,7 +61,8 @@@ import org.apache.ignite.configuration.
  import org.apache.ignite.configuration.IgniteConfiguration;
  import org.apache.ignite.events.Event;
  import org.apache.ignite.internal.GridKernalContext;
 +import org.apache.ignite.internal.IgniteInternalFuture;
+ import org.apache.ignite.internal.IgniteEx;
  import org.apache.ignite.internal.IgniteKernal;
  import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
  import org.apache.ignite.internal.processors.cache.GridCacheAdapter;

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
index d225501,03204e2..d721c69
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
@@@ -77,16 -66,8 +78,18 @@@ public class IgniteCacheTestSuite5 exte
  
          suite.addTestSuite(CacheRebalancingSelfTest.class);
  
 +        // Affinity tests.
 +        suite.addTestSuite(FairAffinityFunctionNodesSelfTest.class);
 +        suite.addTestSuite(FairAffinityFunctionSelfTest.class);
 +        suite.addTestSuite(FairAffinityDynamicCacheSelfTest.class);
 +        suite.addTestSuite(GridCacheAffinityBackupsSelfTest.class);
 +        suite.addTestSuite(IgniteCacheAffinitySelfTest.class);
 +        suite.addTestSuite(AffinityClientNodeSelfTest.class);
 +        suite.addTestSuite(LocalAffinityFunctionTest.class);
 +        suite.addTestSuite(AffinityHistoryCleanupTest.class);
 +
+         suite.addTestSuite(PartitionsExchangeOnDiscoveryHistoryOverflowTest.class);
+ 
          return suite;
      }
  }

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
index 9a0f00f,41035ec..c6ada36
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
@@@ -42,9 -42,9 +42,8 @@@ import org.apache.ignite.internal.manag
  import org.apache.ignite.internal.managers.deployment.GridDeploymentManagerStopSelfTest;
  import org.apache.ignite.internal.managers.discovery.GridDiscoveryManagerAliveCacheSelfTest;
  import org.apache.ignite.internal.managers.discovery.GridDiscoveryManagerAttributesSelfTest;
- import org.apache.ignite.internal.managers.discovery.GridDiscoveryManagerSelfTest;
  import org.apache.ignite.internal.managers.discovery.IgniteTopologyPrintFormatSelfTest;
  import org.apache.ignite.internal.managers.events.GridEventStorageManagerSelfTest;
 -import org.apache.ignite.internal.managers.swapspace.GridSwapSpaceManagerSelfTest;
  import org.apache.ignite.internal.processors.cluster.GridAddressResolverSelfTest;
  import org.apache.ignite.internal.processors.cluster.GridUpdateNotifierSelfTest;
  import org.apache.ignite.internal.processors.port.GridPortProcessorSelfTest;

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
----------------------------------------------------------------------
diff --cc modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
index b6060b4,c3a1362..bc27ae7
--- a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
+++ b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
@@@ -124,7 -134,12 +139,12 @@@ public class GridH2SpatialIndex extend
      }
  
      /** {@inheritDoc} */
+     @Override protected int segmentsCount() {
+         return segments.length;
+     }
+ 
+     /** {@inheritDoc} */
 -    @Nullable @Override protected Object doTakeSnapshot() {
 +    @Nullable @Override protected IgniteTree doTakeSnapshot() {
          return null; // TODO We do not support snapshots, but probably this is possible.
      }
  
@@@ -263,7 -282,11 +287,11 @@@
          try {
              checkClosed();
  
-             return new H2Cursor(rowIterator(treeMap.keySet().iterator(), filter));
+             final int seg = threadLocalSegment();
+ 
+             final MVRTreeMap<Long> segment = segments[seg];
+ 
 -            return new GridH2Cursor(rowIterator(segment.keySet().iterator(), filter));
++            return new H2Cursor(rowIterator(segment.keySet().iterator(), filter));
          }
          finally {
              l.unlock();
@@@ -315,12 -333,13 +343,16 @@@
              if (!first)
                  throw DbException.throwInternalError("Spatial Index can only be fetch by ascending order");
  
-             GridCursor<GridH2Row> iter = rowIterator(treeMap.keySet().iterator(), null);
+             final int seg = threadLocalSegment();
+ 
+             final MVRTreeMap<Long> segment = segments[seg];
+ 
 -            Iterator<GridH2Row> iter = rowIterator(segment.keySet().iterator(), null);
++            GridCursor<GridH2Row> iter = rowIterator(segment.keySet().iterator(), null);
  
 -            return new SingleRowCursor(iter.hasNext() ? iter.next() : null);
 +            return new SingleRowCursor(iter.next() ? iter.get() : null);
 +        }
 +        catch (IgniteCheckedException e) {
 +            throw DbException.convert(e);
          }
          finally {
              l.unlock();
@@@ -347,7 -366,11 +379,11 @@@
              if (intersection == null)
                  return find(filter.getSession(), null, null);
  
-             return new H2Cursor(rowIterator(treeMap.findIntersectingKeys(getEnvelope(intersection, 0)), filter));
+             final int seg = threadLocalSegment();
+ 
+             final MVRTreeMap<Long> segment = segments[seg];
+ 
 -            return new GridH2Cursor(rowIterator(segment.findIntersectingKeys(getEnvelope(intersection, 0)), filter));
++            return new H2Cursor(rowIterator(segment.findIntersectingKeys(getEnvelope(intersection, 0)), filter));
          }
          finally {
              l.unlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
index cde6da6,2aa4292..da01d18
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
@@@ -28,8 -28,9 +28,9 @@@ import org.apache.hadoop.mapred.Reporte
  import org.apache.hadoop.util.ReflectionUtils;
  import org.apache.ignite.IgniteCheckedException;
  import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
 -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
 -import org.apache.ignite.internal.processors.hadoop.HadoopJob;
 +import org.apache.ignite.hadoop.HadoopInputSplit;
 +import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
+ import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
  import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
  import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
  import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@@ -54,52 -55,64 +55,64 @@@ public class HadoopV1MapTask extends Ha
      /** {@inheritDoc} */
      @SuppressWarnings("unchecked")
      @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
 -        HadoopJob job = taskCtx.job();
 +        HadoopJobEx job = taskCtx.job();
  
-         HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+         HadoopV2TaskContext taskCtx0 = (HadoopV2TaskContext)taskCtx;
  
-         JobConf jobConf = ctx.jobConf();
+         if (taskCtx.taskInfo().hasMapperIndex())
+             HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
+         else
+             HadoopMapperUtils.clearMapperIndex();
  
-         InputFormat inFormat = jobConf.getInputFormat();
+         try {
+             JobConf jobConf = taskCtx0.jobConf();
  
-         HadoopInputSplit split = info().inputSplit();
+             InputFormat inFormat = jobConf.getInputFormat();
  
-         InputSplit nativeSplit;
+             HadoopInputSplit split = info().inputSplit();
  
-         if (split instanceof HadoopFileBlock) {
-             HadoopFileBlock block = (HadoopFileBlock)split;
+             InputSplit nativeSplit;
  
-             nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS);
-         }
-         else
-             nativeSplit = (InputSplit)ctx.getNativeSplit(split);
+             if (split instanceof HadoopFileBlock) {
+                 HadoopFileBlock block = (HadoopFileBlock)split;
  
-         assert nativeSplit != null;
+                 nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS);
+             }
+             else
+                 nativeSplit = (InputSplit)taskCtx0.getNativeSplit(split);
  
-         Reporter reporter = new HadoopV1Reporter(taskCtx);
+             assert nativeSplit != null;
  
-         HadoopV1OutputCollector collector = null;
+             Reporter reporter = new HadoopV1Reporter(taskCtx);
  
-         try {
-             collector = collector(jobConf, ctx, !job.info().hasCombiner() && !job.info().hasReducer(),
-                 fileName(), ctx.attemptId());
+             HadoopV1OutputCollector collector = null;
  
-             RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
+             try {
+                 collector = collector(jobConf, taskCtx0, !job.info().hasCombiner() && !job.info().hasReducer(),
+                     fileName(), taskCtx0.attemptId());
  
-             Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
+                 RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
  
-             Object key = reader.createKey();
-             Object val = reader.createValue();
+                 Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
  
-             assert mapper != null;
+                 Object key = reader.createKey();
+                 Object val = reader.createValue();
+ 
+                 assert mapper != null;
  
-             try {
                  try {
-                     while (reader.next(key, val)) {
-                         if (isCancelled())
-                             throw new HadoopTaskCancelledException("Map task cancelled.");
+                     try {
+                         while (reader.next(key, val)) {
+                             if (isCancelled())
+                                 throw new HadoopTaskCancelledException("Map task cancelled.");
+ 
+                             mapper.map(key, val, collector, reporter);
+                         }
  
-                         mapper.map(key, val, collector, reporter);
+                         taskCtx.onMapperFinished();
+                     }
+                     finally {
+                         mapper.close();
                      }
                  }
                  finally {

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
index 6b90653,5c1dd15..2bbf8bc
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
@@@ -22,7 -22,8 +22,8 @@@ import org.apache.hadoop.mapred.Reducer
  import org.apache.hadoop.mapred.Reporter;
  import org.apache.hadoop.util.ReflectionUtils;
  import org.apache.ignite.IgniteCheckedException;
 -import org.apache.ignite.internal.processors.hadoop.HadoopJob;
 +import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
+ import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
  import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
  import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
  import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@@ -51,34 -52,47 +52,47 @@@ public class HadoopV1ReduceTask extend
      /** {@inheritDoc} */
      @SuppressWarnings("unchecked")
      @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
 -        HadoopJob job = taskCtx.job();
 +        HadoopJobEx job = taskCtx.job();
  
-         HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+         HadoopV2TaskContext taskCtx0 = (HadoopV2TaskContext)taskCtx;
  
-         JobConf jobConf = ctx.jobConf();
- 
-         HadoopTaskInput input = taskCtx.input();
- 
-         HadoopV1OutputCollector collector = null;
+         if (!reduce && taskCtx.taskInfo().hasMapperIndex())
+             HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
+         else
+             HadoopMapperUtils.clearMapperIndex();
  
          try {
-             collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId());
+             JobConf jobConf = taskCtx0.jobConf();
  
-             Reducer reducer;
-             if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(),
-                 jobConf);
-             else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(),
-                 jobConf);
+             HadoopTaskInput input = taskCtx.input();
  
-             assert reducer != null;
+             HadoopV1OutputCollector collector = null;
  
              try {
+                 collector = collector(jobConf, taskCtx0, reduce || !job.info().hasReducer(), fileName(), taskCtx0.attemptId());
+ 
+                 Reducer reducer;
+                 if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(),
+                     jobConf);
+                 else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(),
+                     jobConf);
+ 
+                 assert reducer != null;
+ 
                  try {
-                     while (input.next()) {
-                         if (isCancelled())
-                             throw new HadoopTaskCancelledException("Reduce task cancelled.");
+                     try {
+                         while (input.next()) {
+                             if (isCancelled())
+                                 throw new HadoopTaskCancelledException("Reduce task cancelled.");
+ 
+                             reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
+                         }
  
-                         reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
+                         if (!reduce)
+                             taskCtx.onMapperFinished();
+                     }
+                     finally {
+                         reducer.close();
                      }
                  }
                  finally {

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
index 11f2ecc,6202622..5f8c506
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
@@@ -30,8 -30,7 +30,7 @@@ import org.apache.hadoop.mapreduce.lib.
  import org.apache.hadoop.mapreduce.task.JobContextImpl;
  import org.apache.ignite.IgniteCheckedException;
  import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
 -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
 +import org.apache.ignite.hadoop.HadoopInputSplit;
- import org.apache.ignite.internal.processors.hadoop.HadoopMapperAwareTaskOutput;
  import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
  import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
  import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemClientSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------


Mime
View raw message