ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/2] ignite git commit: ignite-2893 For datastructures use invoke instead of explicit txs, got rid of unnecessary outTx usage.
Date Thu, 20 Apr 2017 10:10:38 GMT
ignite-2893 For datastructures use invoke instead of explicit txs, got rid of unnecessary outTx usage.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee955df9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee955df9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee955df9

Branch: refs/heads/ignite-2.0
Commit: ee955df9fb80737292aac5f7ad3c82f8f0d8ea8e
Parents: f440480
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Apr 20 13:10:28 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Apr 20 13:10:28 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |   4 +-
 .../processors/cache/GridCacheUtils.java        | 117 ++--
 .../datastructures/DataStructuresProcessor.java |  61 +-
 .../datastructures/GridCacheAtomicLongImpl.java | 626 +++++++++++--------
 .../GridCacheAtomicReferenceImpl.java           | 276 ++++----
 .../GridCacheAtomicSequenceImpl.java            |  88 +--
 .../GridCacheAtomicStampedImpl.java             | 293 ++++-----
 .../GridCacheCountDownLatchImpl.java            |  56 +-
 .../datastructures/GridCacheLockImpl.java       |  80 +--
 .../datastructures/GridCacheQueueProxy.java     | 292 +--------
 .../datastructures/GridCacheSemaphoreImpl.java  |  56 +-
 .../datastructures/GridCacheSetProxy.java       | 152 +----
 .../GridTransactionalCacheQueueImpl.java        |   8 +-
 13 files changed, 812 insertions(+), 1297 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index a3d4c81..5438163 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2451,7 +2451,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             validateCacheKey(key);
 
         return syncOp(new SyncOp<EntryProcessorResult<T>>(true) {
-            @Nullable @Override public EntryProcessorResult<T> op(GridNearTxLocal tx)
+            @Override public EntryProcessorResult<T> op(GridNearTxLocal tx)
                 throws IgniteCheckedException {
                 assert topVer == null || tx.implicit();
 
@@ -2489,7 +2489,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             validateCacheKeys(keys);
 
         return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() == 1) {
-            @Nullable @Override public Map<K, EntryProcessorResult<T>> op(GridNearTxLocal tx)
+            @Override public Map<K, EntryProcessorResult<T>> op(GridNearTxLocal tx)
                 throws IgniteCheckedException {
                 Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys,
                     new C1<K, EntryProcessor<K, V, Object>>() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 5abb6de..df9c7c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -881,31 +881,6 @@ public class GridCacheUtils {
     }
 
     /**
-     * Method executes any Callable out of scope of transaction.
-     * If transaction started by this thread {@code cmd} will be executed in another thread.
-     *
-     * @param cmd Callable.
-     * @param ctx Cache context.
-     * @return T Callable result.
-     * @throws IgniteCheckedException If execution failed.
-     */
-    public static <T> T outTx(Callable<T> cmd, GridCacheContext ctx) throws IgniteCheckedException {
-        if (ctx.tm().inUserTx())
-            return ctx.closures().callLocalSafe(cmd, false).get();
-        else {
-            try {
-                return cmd.call();
-            }
-            catch (IgniteCheckedException | IgniteException | IllegalStateException e) {
-                throw e;
-            }
-            catch (Exception e) {
-                throw new IgniteCheckedException(e);
-            }
-        }
-    }
-
-    /**
      * @param val Value.
      * @param skip Skip value flag.
      * @return Value.
@@ -1604,56 +1579,58 @@ public class GridCacheUtils {
 
     /**
      * @param c Closure to retry.
-     * @param <S> Closure type.
-     * @return Wrapped closure.
-     */
-    public static <S> Callable<S> retryTopologySafe(final Callable<S> c ) {
-        return new Callable<S>() {
-            @Override public S call() throws Exception {
-                IgniteCheckedException err = null;
-
-                for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) {
-                    try {
-                        return c.call();
-                    }
-                    catch (ClusterGroupEmptyCheckedException | ClusterTopologyServerNotFoundException e) {
+     * @throws IgniteCheckedException If failed.
+     * @return Closure result.
+     */
+    public static <S> S retryTopologySafe(final Callable<S> c) throws IgniteCheckedException {
+        IgniteCheckedException err = null;
+
+        for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) {
+            try {
+                return c.call();
+            }
+            catch (ClusterGroupEmptyCheckedException | ClusterTopologyServerNotFoundException e) {
+                throw e;
+            }
+            catch (TransactionRollbackException e) {
+                if (i + 1 == GridCacheAdapter.MAX_RETRIES)
+                    throw e;
+
+                U.sleep(1);
+            }
+            catch (IgniteCheckedException e) {
+                if (i + 1 == GridCacheAdapter.MAX_RETRIES)
+                    throw e;
+
+                if (X.hasCause(e, ClusterTopologyCheckedException.class)) {
+                    ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
+
+                    if (topErr instanceof ClusterGroupEmptyCheckedException || topErr instanceof
+                        ClusterTopologyServerNotFoundException)
                         throw e;
-                    }
-                    catch (TransactionRollbackException e) {
-                        if (i + 1 == GridCacheAdapter.MAX_RETRIES)
-                            throw e;
 
+                    // IGNITE-1948: remove this check when the issue is fixed
+                    if (topErr.retryReadyFuture() != null)
+                        topErr.retryReadyFuture().get();
+                    else
                         U.sleep(1);
-                    }
-                    catch (IgniteCheckedException e) {
-                        if (i + 1 == GridCacheAdapter.MAX_RETRIES)
-                            throw e;
-
-                        if (X.hasCause(e, ClusterTopologyCheckedException.class)) {
-                            ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
-
-                            if (topErr instanceof ClusterGroupEmptyCheckedException || topErr instanceof
-                                ClusterTopologyServerNotFoundException)
-                                throw e;
-
-                            // IGNITE-1948: remove this check when the issue is fixed
-                            if (topErr.retryReadyFuture() != null)
-                                topErr.retryReadyFuture().get();
-                            else
-                                U.sleep(1);
-                        }
-                        else if (X.hasCause(e, IgniteTxRollbackCheckedException.class,
-                            CachePartialUpdateCheckedException.class))
-                            U.sleep(1);
-                        else
-                            throw e;
-                    }
                 }
-
-                // Should never happen.
-                throw err;
+                else if (X.hasCause(e, IgniteTxRollbackCheckedException.class,
+                    CachePartialUpdateCheckedException.class))
+                    U.sleep(1);
+                else
+                    throw e;
             }
-        };
+            catch (RuntimeException e) {
+                throw e;
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException(e);
+            }
+        }
+
+        // Should never happen.
+        throw err;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 102db96..0a439dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -476,7 +476,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
      * @param name Sequence name.
      * @throws IgniteCheckedException If removing failed.
      */
-    public final void removeSequence(final String name) throws IgniteCheckedException {
+    final void removeSequence(final String name) throws IgniteCheckedException {
         assert name != null;
 
         awaitInitialization();
@@ -488,9 +488,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
                 dsCacheCtx.gate().enter();
 
                 try {
-                    GridCacheInternal key = new GridCacheInternalKeyImpl(name);
-
-                    removeInternal(key, GridCacheAtomicSequenceValue.class);
+                    dsView.remove(new GridCacheInternalKeyImpl(name));
                 }
                 finally {
                     dsCacheCtx.gate().leave();
@@ -631,7 +629,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
      * @param name Atomic long name.
      * @throws IgniteCheckedException If removing failed.
      */
-    public final void removeAtomicLong(final String name) throws IgniteCheckedException {
+    final void removeAtomicLong(final String name) throws IgniteCheckedException {
         assert name != null;
         assert dsCacheCtx != null;
 
@@ -642,7 +640,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
                 dsCacheCtx.gate().enter();
 
                 try {
-                    removeInternal(new GridCacheInternalKeyImpl(name), GridCacheAtomicLongValue.class);
+                    dsView.remove(new GridCacheInternalKeyImpl(name));
                 }
                 finally {
                     dsCacheCtx.gate().leave();
@@ -790,7 +788,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
      * @param name Atomic reference name.
      * @throws IgniteCheckedException If removing failed.
      */
-    public final void removeAtomicReference(final String name) throws IgniteCheckedException {
+    final void removeAtomicReference(final String name) throws IgniteCheckedException {
         assert name != null;
         assert dsCacheCtx != null;
 
@@ -801,9 +799,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
                 dsCacheCtx.gate().enter();
 
                 try {
-                    GridCacheInternal key = new GridCacheInternalKeyImpl(name);
-
-                    removeInternal(key, GridCacheAtomicReferenceValue.class);
+                    dsView.remove(new GridCacheInternalKeyImpl(name));
                 }
                 finally {
                     dsCacheCtx.gate().leave();
@@ -894,7 +890,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
      * @param name Atomic stamped name.
      * @throws IgniteCheckedException If removing failed.
      */
-    public final void removeAtomicStamped(final String name) throws IgniteCheckedException {
+    final void removeAtomicStamped(final String name) throws IgniteCheckedException {
         assert name != null;
         assert dsCacheCtx != null;
 
@@ -905,9 +901,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
                 dsCacheCtx.gate().enter();
 
                 try {
-                    GridCacheInternal key = new GridCacheInternalKeyImpl(name);
-
-                    removeInternal(key, GridCacheAtomicStampedValue.class);
+                    dsView.remove(new GridCacheInternalKeyImpl(name));
                 }
                 finally {
                     dsCacheCtx.gate().leave();
@@ -1516,43 +1510,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
     }
 
     /**
-     * Remove internal entry by key from cache.
-     *
-     * @param key Internal entry key.
-     * @param cls Class of object which will be removed. If cached object has different type exception will be thrown.
-     * @return Method returns true if sequence has been removed and false if it's not cached.
-     * @throws IgniteCheckedException If removing failed or class of object is different to expected class.
-     */
-    private <R> boolean removeInternal(final GridCacheInternal key, final Class<R> cls) throws IgniteCheckedException {
-        return CU.outTx(
-            new Callable<Boolean>() {
-                @Override public Boolean call() throws Exception {
-                    try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
-                        // Check correctness type of removable object.
-                        R val = cast(dsView.get(key), cls);
-
-                        if (val != null) {
-                            dsView.remove(key);
-
-                            tx.commit();
-                        }
-                        else
-                            tx.setRollbackOnly();
-
-                        return val != null;
-                    }
-                    catch (Error | Exception e) {
-                        U.error(log, "Failed to remove data structure: " + key, e);
-
-                        throw e;
-                    }
-                }
-            },
-            dsCacheCtx
-        );
-    }
-
-    /**
      *
      */
     static class DataStructuresEntryFilter implements CacheEntryEventSerializableFilter<Object, Object> {
@@ -1769,7 +1726,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
      */
     public static <R> R retry(IgniteLogger log, Callable<R> call) throws IgniteCheckedException {
         try {
-            return GridCacheUtils.retryTopologySafe(call).call();
+            return GridCacheUtils.retryTopologySafe(call);
         }
         catch (IgniteCheckedException e) {
             throw e;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
index be718cf..3f07151 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
@@ -23,23 +23,20 @@ import java.io.InvalidObjectException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.ObjectStreamException;
-import java.util.concurrent.Callable;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
 import org.apache.ignite.lang.IgniteBiTuple;
 
-import static org.apache.ignite.internal.util.typedef.internal.CU.retryTopologySafe;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-
 /**
  * Cache atomic long implementation.
  */
@@ -55,9 +52,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
             }
         };
 
-    /** Logger. */
-    private IgniteLogger log;
-
     /** Atomic long name. */
     private String name;
 
@@ -76,126 +70,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
     /** Cache context. */
     private GridCacheContext ctx;
 
-    /** Callable for {@link #get()}. */
-    private final Callable<Long> getCall = new Callable<Long>() {
-        @Override public Long call() throws Exception {
-            GridCacheAtomicLongValue val = atomicView.get(key);
-
-            if (val == null)
-                throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
-            return val.get();
-        }
-    };
-
-    /** Callable for {@link #incrementAndGet()}. */
-    private final Callable<Long> incAndGetCall = retryTopologySafe(new Callable<Long>() {
-        @Override public Long call() throws Exception {
-            try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
-                GridCacheAtomicLongValue val = atomicView.get(key);
-
-                if (val == null)
-                    throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
-                long retVal = val.get() + 1;
-
-                val.set(retVal);
-
-                atomicView.put(key, val);
-
-                tx.commit();
-
-                return retVal;
-            }
-            catch (Error | Exception e) {
-                U.error(log, "Failed to increment and get: " + this, e);
-
-                throw e;
-            }
-        }
-    });
-
-    /** Callable for {@link #getAndIncrement()}. */
-    private final Callable<Long> getAndIncCall = retryTopologySafe(new Callable<Long>() {
-        @Override public Long call() throws Exception {
-            try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
-                GridCacheAtomicLongValue val = atomicView.get(key);
-
-                if (val == null)
-                    throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
-                long retVal = val.get();
-
-                val.set(retVal + 1);
-
-                atomicView.put(key, val);
-
-                tx.commit();
-
-                return retVal;
-            }
-            catch (Error | Exception e) {
-                U.error(log, "Failed to get and increment: " + this, e);
-
-                throw e;
-            }
-        }
-    });
-
-    /** Callable for {@link #decrementAndGet()}. */
-    private final Callable<Long> decAndGetCall = retryTopologySafe(new Callable<Long>() {
-        @Override public Long call() throws Exception {
-            try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
-                GridCacheAtomicLongValue val = atomicView.get(key);
-
-                if (val == null)
-                    throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
-                long retVal = val.get() - 1;
-
-                val.set(retVal);
-
-                atomicView.put(key, val);
-
-                tx.commit();
-
-                return retVal;
-            }
-            catch (Error | Exception e) {
-                U.error(log, "Failed to decrement and get: " + this, e);
-
-                throw e;
-            }
-        }
-    });
-
-    /** Callable for {@link #getAndDecrement()}. */
-    private final Callable<Long> getAndDecCall = retryTopologySafe(new Callable<Long>() {
-        @Override public Long call() throws Exception {
-            try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
-                GridCacheAtomicLongValue val = atomicView.get(key);
-
-                if (val == null)
-                    throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
-                long retVal = val.get();
-
-                val.set(retVal - 1);
-
-                atomicView.put(key, val);
-
-                tx.commit();
-
-                return retVal;
-            }
-            catch (Error | Exception e) {
-                U.error(log, "Failed to get and decrement and get: " + this, e);
-
-                throw e;
-            }
-        }
-    });
-
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -211,8 +85,10 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
      * @param atomicView Atomic projection.
      * @param ctx CacheContext.
      */
-    public GridCacheAtomicLongImpl(String name, GridCacheInternalKey key,
-        IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicLongValue> atomicView, GridCacheContext ctx) {
+    public GridCacheAtomicLongImpl(String name,
+        GridCacheInternalKey key,
+        IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicLongValue> atomicView,
+        GridCacheContext ctx) {
         assert key != null;
         assert atomicView != null;
         assert ctx != null;
@@ -222,8 +98,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
         this.key = key;
         this.atomicView = atomicView;
         this.name = name;
-
-        log = ctx.logger(getClass());
     }
 
     /** {@inheritDoc} */
@@ -236,7 +110,12 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
         checkRemoved();
 
         try {
-            return CU.outTx(getCall, ctx);
+            GridCacheAtomicLongValue val = atomicView.get(key);
+
+            if (val == null)
+                throw new IgniteException("Failed to find atomic long: " + name);
+
+            return val.get();
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -248,7 +127,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
         checkRemoved();
 
         try{
-            return CU.outTx(incAndGetCall, ctx);
+            EntryProcessorResult<Long> res = atomicView.invoke(key, IncrementAndGetProcessor.INSTANCE);
+
+            assert res != null && res.get() != null : res;
+
+            return res.get();
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -260,7 +146,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
         checkRemoved();
 
         try {
-            return CU.outTx(getAndIncCall, ctx);
+            EntryProcessorResult<Long> res = atomicView.invoke(key, GetAndIncrementProcessor.INSTANCE);
+
+            assert res != null && res.get() != null : res;
+
+            return res.get();
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -272,7 +165,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
         checkRemoved();
 
         try {
-            return CU.outTx(internalAddAndGet(l), ctx);
+            EntryProcessorResult<Long> res = atomicView.invoke(key, new AddAndGetProcessor(l));
+
+            assert res != null && res.get() != null : res;
+
+            return res.get();
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -284,7 +184,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
         checkRemoved();
 
         try {
-            return CU.outTx(internalGetAndAdd(l), ctx);
+            EntryProcessorResult<Long> res = atomicView.invoke(key, new GetAndAddProcessor(l));
+
+            assert res != null && res.get() != null : res;
+
+            return res.get();
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -296,7 +203,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
         checkRemoved();
 
         try {
-            return CU.outTx(decAndGetCall, ctx);
+            EntryProcessorResult<Long> res = atomicView.invoke(key, DecrementAndGetProcessor.INSTANCE);
+
+            assert res != null && res.get() != null : res;
+
+            return res.get();
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -308,7 +222,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
         checkRemoved();
 
         try {
-            return CU.outTx(getAndDecCall, ctx);
+            EntryProcessorResult<Long> res = atomicView.invoke(key, GetAndDecrementProcessor.INSTANCE);
+
+            assert res != null && res.get() != null : res;
+
+            return res.get();
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -320,7 +241,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
         checkRemoved();
 
         try {
-            return CU.outTx(internalGetAndSet(l), ctx);
+            EntryProcessorResult<Long> res = atomicView.invoke(key, new GetAndSetProcessor(l));
+
+            assert res != null && res.get() != null : res;
+
+            return res.get();
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -332,7 +260,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
         checkRemoved();
 
         try {
-            return CU.outTx(internalCompareAndSetAndGet(expVal, newVal) , ctx) == expVal;
+            EntryProcessorResult<Long> res = atomicView.invoke(key, new CompareAndSetProcessor(expVal, newVal));
+
+            assert res != null && res.get() != null : res;
+
+            return res.get() == expVal;
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -348,7 +283,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
         checkRemoved();
 
         try {
-            return CU.outTx(internalCompareAndSetAndGet(expVal, newVal), ctx);
+            EntryProcessorResult<Long> res = atomicView.invoke(key, new CompareAndSetProcessor(expVal, newVal));
+
+            assert res != null && res.get() != null : res;
+
+            return res.get();
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -422,193 +364,335 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
+        this.atomicView = kctx.cache().atomicsCache();
+        this.ctx = atomicView.context();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(ctx.kernalContext());
+        out.writeUTF(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        IgniteBiTuple<GridKernalContext, String> t = stash.get();
+
+        t.set1((GridKernalContext)in.readObject());
+        t.set2(in.readUTF());
+    }
+
     /**
-     * Method returns callable for execution {@link #addAndGet(long)} operation in async and sync mode.
+     * Reconstructs object on unmarshalling.
      *
-     * @param l Value will be added to atomic long.
-     * @return Callable for execution in async and sync mode.
+     * @return Reconstructed object.
+     * @throws ObjectStreamException Thrown in case of unmarshalling error.
      */
-    private Callable<Long> internalAddAndGet(final long l) {
-        return retryTopologySafe(new Callable<Long>() {
-            @Override public Long call() throws Exception {
-                try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
-                    GridCacheAtomicLongValue val = atomicView.get(key);
+    private Object readResolve() throws ObjectStreamException {
+        try {
+            IgniteBiTuple<GridKernalContext, String> t = stash.get();
 
-                    if (val == null)
-                        throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
+            return t.get1().dataStructures().atomicLong(t.get2(), 0L, false);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.withCause(new InvalidObjectException(e.getMessage()), e);
+        }
+        finally {
+            stash.remove();
+        }
+    }
 
-                    long retVal = val.get() + l;
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheAtomicLongImpl.class, this);
+    }
 
-                    val.set(retVal);
+    /**
+     *
+     */
+    static class GetAndSetProcessor implements
+        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final long newVal;
+
+        /**
+         * @param newVal New value.
+         */
+        GetAndSetProcessor(long newVal) {
+            this.newVal = newVal;
+        }
 
-                    atomicView.put(key, val);
+        /** {@inheritDoc} */
+        @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+            GridCacheAtomicLongValue val = e.getValue();
 
-                    tx.commit();
+            if (val == null)
+                throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
 
-                    return retVal;
-                }
-                catch (Error | Exception e) {
-                    U.error(log, "Failed to add and get: " + this, e);
+            long curVal = val.get();
 
-                    throw e;
-                }
-            }
-        });
+            e.setValue(new GridCacheAtomicLongValue(newVal));
+
+            return curVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(GetAndSetProcessor.class, this);
+        }
     }
 
     /**
-     * Method returns callable for execution {@link #getAndAdd(long)} operation in async and sync mode.
      *
-     * @param l Value will be added to atomic long.
-     * @return Callable for execution in async and sync mode.
      */
-    private Callable<Long> internalGetAndAdd(final long l) {
-        return retryTopologySafe(new Callable<Long>() {
-            @Override public Long call() throws Exception {
-                try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
-                    GridCacheAtomicLongValue val = atomicView.get(key);
-
-                    if (val == null)
-                        throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
+    static class GetAndAddProcessor implements
+        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final long delta;
+
+        /**
+         * @param delta Delta.
+         */
+        GetAndAddProcessor(long delta) {
+            this.delta = delta;
+        }
 
-                    long retVal = val.get();
+        /** {@inheritDoc} */
+        @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+            GridCacheAtomicLongValue val = e.getValue();
 
-                    val.set(retVal + l);
+            if (val == null)
+                throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
 
-                    atomicView.put(key, val);
+            long curVal = val.get();
 
-                    tx.commit();
+            e.setValue(new GridCacheAtomicLongValue(curVal + delta));
 
-                    return retVal;
-                }
-                catch (Error | Exception e) {
-                    U.error(log, "Failed to get and add: " + this, e);
+            return curVal;
+        }
 
-                    throw e;
-                }
-            }
-        });
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(GetAndAddProcessor.class, this);
+        }
     }
 
     /**
-     * Method returns callable for execution {@link #getAndSet(long)} operation in async and sync mode.
      *
-     * @param l Value will be added to atomic long.
-     * @return Callable for execution in async and sync mode.
      */
-    private Callable<Long> internalGetAndSet(final long l) {
-        return new Callable<Long>() {
-            @Override public Long call() throws Exception {
-                try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
-                    GridCacheAtomicLongValue val = atomicView.get(key);
+    static class AddAndGetProcessor implements
+        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final long delta;
+
+        /**
+         * @param delta Delta.
+         */
+        AddAndGetProcessor(long delta) {
+            this.delta = delta;
+        }
 
-                    if (val == null)
-                        throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
+        /** {@inheritDoc} */
+        @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+            GridCacheAtomicLongValue val = e.getValue();
 
-                    long retVal = val.get();
+            if (val == null)
+                throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
 
-                    val.set(l);
+            long newVal = val.get() + delta;
 
-                    atomicView.put(key, val);
+            e.setValue(new GridCacheAtomicLongValue(newVal));
 
-                    tx.commit();
+            return newVal;
+        }
 
-                    return retVal;
-                }
-                catch (Error | Exception e) {
-                    U.error(log, "Failed to get and set: " + this, e);
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(AddAndGetProcessor.class, this);
+        }
+    }
 
-                    throw e;
-                }
-            }
-        };
+    /**
+     *
+     */
+    static class CompareAndSetProcessor implements
+        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final long expVal;
+
+        /** */
+        private final long newVal;
+
+        /**
+         * @param expVal Expected value.
+         * @param newVal New value.
+         */
+        CompareAndSetProcessor(long expVal, long newVal) {
+            this.expVal = expVal;
+            this.newVal = newVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+            GridCacheAtomicLongValue val = e.getValue();
+
+            if (val == null)
+                throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
+
+            long curVal = val.get();
+
+            if (curVal == expVal)
+                e.setValue(new GridCacheAtomicLongValue(newVal));
+
+            return curVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CompareAndSetProcessor.class, this);
+        }
     }
 
     /**
-     * Method returns callable for execution {@link #compareAndSetAndGet(long, long)}
-     * operation in async and sync mode.
      *
-     * @param expVal Expected atomic long value.
-     * @param newVal New atomic long value.
-     * @return Callable for execution in async and sync mode.
      */
-    private Callable<Long> internalCompareAndSetAndGet(final long expVal, final long newVal) {
-        return new Callable<Long>() {
-            @Override public Long call() throws Exception {
-                try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
-                    GridCacheAtomicLongValue val = atomicView.get(key);
+    static class GetAndIncrementProcessor implements
+        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+        /** */
+        private static final long serialVersionUID = 0L;
 
-                    if (val == null)
-                        throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
+        /** */
+        private static final GetAndIncrementProcessor INSTANCE = new GetAndIncrementProcessor();
 
-                    long retVal = val.get();
+        /** {@inheritDoc} */
+        @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+            GridCacheAtomicLongValue val = e.getValue();
 
-                    if (retVal == expVal) {
-                        val.set(newVal);
+            if (val == null)
+                throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
 
-                        atomicView.getAndPut(key, val);
+            long ret = val.get();
 
-                        tx.commit();
-                    }
+            e.setValue(new GridCacheAtomicLongValue(ret + 1));
 
-                    return retVal;
-                }
-                catch (Error | Exception e) {
-                    U.error(log, "Failed to compare and set: " + this, e);
+            return ret;
+        }
 
-                    throw e;
-                }
-            }
-        };
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(GetAndIncrementProcessor.class, this);
+        }
     }
 
-    /** {@inheritDoc} */
-    @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
-        this.atomicView = kctx.cache().atomicsCache();
-        this.ctx = atomicView.context();
-    }
+    /**
+     *
+     */
+    static class IncrementAndGetProcessor implements
+        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+        /** */
+        private static final long serialVersionUID = 0L;
 
-    /** {@inheritDoc} */
-    @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+        /** */
+        private static final IncrementAndGetProcessor INSTANCE = new IncrementAndGetProcessor();
 
-    }
+        /** {@inheritDoc} */
+        @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+            GridCacheAtomicLongValue val = e.getValue();
 
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeObject(ctx.kernalContext());
-        out.writeUTF(name);
-    }
+            if (val == null)
+                throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
 
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        IgniteBiTuple<GridKernalContext, String> t = stash.get();
+            long newVal = val.get() + 1;
 
-        t.set1((GridKernalContext)in.readObject());
-        t.set2(in.readUTF());
+            e.setValue(new GridCacheAtomicLongValue(newVal));
+
+            return newVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(IncrementAndGetProcessor.class, this);
+        }
     }
 
     /**
-     * Reconstructs object on unmarshalling.
      *
-     * @return Reconstructed object.
-     * @throws ObjectStreamException Thrown in case of unmarshalling error.
      */
-    private Object readResolve() throws ObjectStreamException {
-        try {
-            IgniteBiTuple<GridKernalContext, String> t = stash.get();
+    static class GetAndDecrementProcessor implements
+        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+        /** */
+        private static final long serialVersionUID = 0L;
 
-            return t.get1().dataStructures().atomicLong(t.get2(), 0L, false);
-        }
-        catch (IgniteCheckedException e) {
-            throw U.withCause(new InvalidObjectException(e.getMessage()), e);
+        /** */
+        private static final GetAndDecrementProcessor INSTANCE = new GetAndDecrementProcessor();
+
+        /** {@inheritDoc} */
+        @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+            GridCacheAtomicLongValue val = e.getValue();
+
+            if (val == null)
+                throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
+
+            long ret = val.get();
+
+            e.setValue(new GridCacheAtomicLongValue(ret - 1));
+
+            return ret;
         }
-        finally {
-            stash.remove();
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(GetAndDecrementProcessor.class, this);
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheAtomicLongImpl.class, this);
+    /**
+     *
+     */
+    static class DecrementAndGetProcessor implements
+        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private static final DecrementAndGetProcessor INSTANCE = new DecrementAndGetProcessor();
+
+        /** {@inheritDoc} */
+        @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+            GridCacheAtomicLongValue val = e.getValue();
+
+            if (val == null)
+                throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
+
+            long newVal = val.get() - 1;
+
+            e.setValue(new GridCacheAtomicLongValue(newVal));
+
+            return newVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DecrementAndGetProcessor.class, this);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
index 4365468..b7dc007 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
@@ -23,24 +23,21 @@ import java.io.InvalidObjectException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.ObjectStreamException;
-import java.util.concurrent.Callable;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
 import org.apache.ignite.lang.IgniteBiTuple;
 
-import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-
 /**
  * Cache atomic reference implementation.
  */
@@ -56,9 +53,6 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
             }
         };
 
-    /** Logger. */
-    private IgniteLogger log;
-
     /** Atomic reference name. */
     private String name;
 
@@ -77,18 +71,6 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
     /** Cache context. */
     private GridCacheContext ctx;
 
-    /** Callable for {@link #get} operation */
-    private final Callable<T> getCall = new Callable<T>() {
-        @Override public T call() throws Exception {
-            GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
-
-            if (ref == null)
-                throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name);
-
-            return ref.get();
-        }
-    };
-
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -117,8 +99,6 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
         this.key = key;
         this.atomicView = atomicView;
         this.name = name;
-
-        log = ctx.logger(getClass());
     }
 
     /** {@inheritDoc} */
@@ -131,7 +111,12 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
         checkRemoved();
 
         try {
-            return CU.outTx(getCall, ctx);
+            GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
+
+            if (ref == null)
+                throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name);
+
+            return ref.get();
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -143,7 +128,10 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
         checkRemoved();
 
         try {
-            CU.outTx(internalSet(val), ctx);
+            atomicView.invoke(key, new ReferenceSetEntryProcessor<>(val));
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -152,20 +140,42 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
 
     /** {@inheritDoc} */
     @Override public boolean compareAndSet(T expVal, T newVal) {
-        return compareAndSetAndGet(newVal, expVal) == expVal;
+        try {
+            EntryProcessorResult<Boolean> res =
+                atomicView.invoke(key, new ReferenceCompareAndSetEntryProcessor<>(expVal, newVal));
+
+            assert res != null && res.get() != null : res;
+
+            return res.get();
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
     }
 
     /**
      * Compares current value with specified value for equality and, if they are equal, replaces current value.
      *
      * @param newVal New value to set.
+     * @param expVal Expected value.
      * @return Original value.
      */
     public T compareAndSetAndGet(T newVal, T expVal) {
         checkRemoved();
 
         try {
-            return CU.outTx(internalCompareAndSetAndGet(expVal, newVal), ctx);
+            EntryProcessorResult<T> res =
+                atomicView.invoke(key, new ReferenceCompareAndSetAndGetEntryProcessor<T>(expVal, newVal));
+
+            assert res != null;
+
+            return res.get();
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -205,82 +215,6 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
         }
     }
 
-    /**
-     * Method returns callable for execution {@link #set(Object)} operation in async and sync mode.
-     *
-     * @param val Value will be set in reference .
-     * @return Callable for execution in async and sync mode.
-     */
-    private Callable<Boolean> internalSet(final T val) {
-        return retryTopologySafe(new Callable<Boolean>() {
-            @Override public Boolean call() throws Exception {
-                try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
-                    GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
-
-                    if (ref == null)
-                        throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name);
-
-                    ref.set(val);
-
-                    atomicView.put(key, ref);
-
-                    tx.commit();
-
-                    return true;
-                }
-                catch (Error | Exception e) {
-                    U.error(log, "Failed to set value [val=" + val + ", atomicReference=" + this + ']', e);
-
-                    throw e;
-                }
-            }
-        });
-    }
-
-    /**
-     * Conditionally sets the new value. It will be set if {@code expValPred} is
-     * evaluate to {@code true}.
-     *
-     * @param expVal Expected value.
-     * @param newVal New value.
-     * @return Callable for execution in async and sync mode.
-     */
-    private Callable<T> internalCompareAndSetAndGet(final T expVal, final T newVal) {
-        return retryTopologySafe(new Callable<T>() {
-            @Override public T call() throws Exception {
-                try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
-                    GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
-
-                    if (ref == null)
-                        throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name);
-
-                    T origVal = ref.get();
-
-                    if (!F.eq(expVal, origVal)) {
-                        tx.setRollbackOnly();
-
-                        return origVal;
-                    }
-                    else {
-                        ref.set(newVal);
-
-                        atomicView.getAndPut(key, ref);
-
-                        tx.commit();
-
-                        return expVal;
-                    }
-                }
-                catch (Error | Exception e) {
-                    U.error(log, "Failed to compare and value [expVal=" + expVal + ", newVal" +
-                        newVal + ", atomicReference" + this + ']', e);
-
-                    throw e;
-                }
-            }
-        });
-    }
-
     /** {@inheritDoc} */
     @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
         this.atomicView = kctx.cache().atomicsCache();
@@ -289,7 +223,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
 
     /** {@inheritDoc} */
     @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
-
+        // No-op.
     }
 
     /**
@@ -363,6 +297,136 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
         }
     }
 
+    /**
+     *
+     */
+    static class ReferenceSetEntryProcessor<T> implements
+        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>, Void> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final T newVal;
+
+        /**
+         * @param newVal New value.
+         */
+        ReferenceSetEntryProcessor(T newVal) {
+            this.newVal = newVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void process(MutableEntry<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> e,
+            Object... args) {
+            GridCacheAtomicReferenceValue val = e.getValue();
+
+            if (val == null)
+                throw new EntryProcessorException("Failed to find atomic reference with given name: " + e.getKey().name());
+
+            e.setValue(new GridCacheAtomicReferenceValue<>(newVal));
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ReferenceSetEntryProcessor.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class ReferenceCompareAndSetEntryProcessor<T> implements
+        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>, Boolean> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final T expVal;
+
+        /** */
+        private final T newVal;
+
+        /**
+         * @param expVal Expected value.
+         * @param newVal New value.
+         */
+        ReferenceCompareAndSetEntryProcessor(T expVal, T newVal) {
+            this.expVal = expVal;
+            this.newVal = newVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Boolean process(MutableEntry<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> e,
+            Object... args) {
+            GridCacheAtomicReferenceValue<T> val = e.getValue();
+
+            if (val == null)
+                throw new EntryProcessorException("Failed to find atomic reference with given name: " + e.getKey().name());
+
+            T curVal = val.get();
+
+            if (F.eq(expVal, curVal)) {
+                e.setValue(new GridCacheAtomicReferenceValue<T>(newVal));
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ReferenceCompareAndSetEntryProcessor.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class ReferenceCompareAndSetAndGetEntryProcessor<T> implements
+        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>, T> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final T expVal;
+
+        /** */
+        private final T newVal;
+
+        /**
+         * @param expVal Expected value.
+         * @param newVal New value.
+         */
+        ReferenceCompareAndSetAndGetEntryProcessor(T expVal, T newVal) {
+            this.expVal = expVal;
+            this.newVal = newVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public T process(MutableEntry<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> e,
+            Object... args) {
+            GridCacheAtomicReferenceValue<T> val = e.getValue();
+
+            if (val == null)
+                throw new EntryProcessorException("Failed to find atomic reference with given name: " + e.getKey().name());
+
+            T curVal = val.get();
+
+            if (F.eq(expVal, curVal))
+                e.setValue(new GridCacheAtomicReferenceValue<T>(newVal));
+
+            return curVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ReferenceCompareAndSetAndGetEntryProcessor.class, this);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCacheAtomicReferenceImpl.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 0661b11..d14bb47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -32,11 +32,9 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -256,7 +254,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
             if (updateGuard.compareAndSet(false, true)) {
                 try {
                     try {
-                        return updateCall.call();
+                        return retryTopologySafe(updateCall);
                     }
                     catch (IgniteCheckedException | IgniteException | IllegalStateException e) {
                         throw e;
@@ -303,86 +301,6 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
         }
     }
 
-    /**
-     * Asynchronous sequence update operation. Will add given amount to the sequence value.
-     *
-     * @param l Increment amount.
-     * @param updateCall Cache call that will update sequence reservation count in accordance with l.
-     * @param updated If {@code true}, will return sequence value after update, otherwise will return sequence value
-     *      prior to update.
-     * @return Future indicating sequence value.
-     * @throws IgniteCheckedException If update failed.
-     */
-    @SuppressWarnings("SignalWithoutCorrespondingAwait")
-    private IgniteInternalFuture<Long> internalUpdateAsync(long l, @Nullable Callable<Long> updateCall, boolean updated)
-        throws IgniteCheckedException {
-        checkRemoved();
-
-        A.ensure(l > 0, " Parameter mustn't be less then 1: " + l);
-
-        lock.lock();
-
-        try {
-            // If reserved range isn't exhausted.
-            if (locVal + l <= upBound) {
-                long curVal = locVal;
-
-                locVal += l;
-
-                return new GridFinishedFuture<>(updated ? locVal : curVal);
-            }
-        }
-        finally {
-            lock.unlock();
-        }
-
-        if (updateCall == null)
-            updateCall = internalUpdate(l, updated);
-
-        while (true) {
-            if (updateGuard.compareAndSet(false, true)) {
-                try {
-                    // This call must be outside lock.
-                    return ctx.closures().callLocalSafe(updateCall, true);
-                }
-                finally {
-                    lock.lock();
-
-                    try {
-                        updateGuard.set(false);
-
-                        cond.signalAll();
-                    }
-                    finally {
-                        lock.unlock();
-                    }
-                }
-            }
-            else {
-                lock.lock();
-
-                try {
-                    while (locVal >= upBound && updateGuard.get())
-                        U.await(cond, 500, MILLISECONDS);
-
-                    checkRemoved();
-
-                    // If reserved range isn't exhausted.
-                    if (locVal + l <= upBound) {
-                        long curVal = locVal;
-
-                        locVal += l;
-
-                        return new GridFinishedFuture<>(updated ? locVal : curVal);
-                    }
-                }
-                finally {
-                    lock.unlock();
-                }
-            }
-        }
-    }
-
     /** Get local batch size for this sequences.
      *
      * @return Sequence batch size.
@@ -485,7 +403,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
      */
     @SuppressWarnings("TooBroadScope")
     private Callable<Long> internalUpdate(final long l, final boolean updated) {
-        return retryTopologySafe(new Callable<Long>() {
+        return new Callable<Long>() {
             @Override public Long call() throws Exception {
                 try (GridNearTxLocal tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) {
                     GridCacheAtomicSequenceValue seq = seqView.get(key);
@@ -556,7 +474,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
                     throw e;
                 }
             }
-        });
+        };
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
index 09cea43..3f14942 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
@@ -23,25 +23,20 @@ import java.io.InvalidObjectException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.ObjectStreamException;
-import java.util.concurrent.Callable;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
-import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.lang.IgnitePredicate;
-
-import static org.apache.ignite.internal.util.typedef.internal.CU.retryTopologySafe;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 
 /**
  * Cache atomic stamped implementation.
@@ -58,9 +53,6 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
             }
         };
 
-    /** Logger. */
-    private IgniteLogger log;
-
     /** Atomic stamped name. */
     private String name;
 
@@ -79,42 +71,6 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
     /** Cache context. */
     private GridCacheContext ctx;
 
-    /** Callable for {@link #get()} operation */
-    private final Callable<IgniteBiTuple<T, S>> getCall = retryTopologySafe(new Callable<IgniteBiTuple<T, S>>() {
-        @Override public IgniteBiTuple<T, S> call() throws Exception {
-            GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
-
-            if (stmp == null)
-                throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
-
-            return stmp.get();
-        }
-    });
-
-    /** Callable for {@link #value()} operation */
-    private final Callable<T> valCall = retryTopologySafe(new Callable<T>() {
-        @Override public T call() throws Exception {
-            GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
-
-            if (stmp == null)
-                throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
-
-            return stmp.value();
-        }
-    });
-
-    /** Callable for {@link #stamp()} operation */
-    private final Callable<S> stampCall = retryTopologySafe(new Callable<S>() {
-        @Override public S call() throws Exception {
-            GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
-
-            if (stmp == null)
-                throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
-
-            return stmp.stamp();
-        }
-    });
-
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -130,8 +86,10 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
      * @param atomicView Atomic projection.
      * @param ctx Cache context.
      */
-    public GridCacheAtomicStampedImpl(String name, GridCacheInternalKey key, IgniteInternalCache<GridCacheInternalKey,
-            GridCacheAtomicStampedValue<T, S>> atomicView, GridCacheContext ctx) {
+    public GridCacheAtomicStampedImpl(String name,
+        GridCacheInternalKey key,
+        IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>> atomicView,
+        GridCacheContext ctx) {
         assert key != null;
         assert atomicView != null;
         assert ctx != null;
@@ -141,8 +99,6 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
         this.key = key;
         this.atomicView = atomicView;
         this.name = name;
-
-        log = ctx.logger(getClass());
     }
 
     /** {@inheritDoc} */
@@ -155,7 +111,12 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
         checkRemoved();
 
         try {
-            return CU.outTx(getCall, ctx);
+            GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
+
+            if (stmp == null)
+                throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
+
+            return stmp.get();
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -167,7 +128,10 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
         checkRemoved();
 
         try {
-            CU.outTx(internalSet(val, stamp), ctx);
+            atomicView.invoke(key, new StampedSetEntryProcessor<>(val, stamp));
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -179,8 +143,15 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
         checkRemoved();
 
         try {
-            return CU.outTx(internalCompareAndSet(F0.equalTo(expVal), wrapperClosure(newVal),
-                F0.equalTo(expStamp), wrapperClosure(newStamp)), ctx);
+            EntryProcessorResult<Boolean> res =
+                atomicView.invoke(key, new StampedCompareAndSetEntryProcessor<>(expVal, expStamp, newVal, newStamp));
+
+            assert res != null && res.get() != null : res;
+
+            return res.get();
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -192,7 +163,12 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
         checkRemoved();
 
         try {
-            return CU.outTx(stampCall, ctx);
+            GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
+
+            if (stmp == null)
+                throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
+
+            return stmp.stamp();
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -204,7 +180,12 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
         checkRemoved();
 
         try {
-            return CU.outTx(valCall, ctx);
+            GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
+
+            if (stmp == null)
+                throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
+
+            return stmp.value();
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -244,100 +225,6 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
         }
     }
 
-    /**
-     * Method make wrapper closure for existing value.
-     *
-     * @param val Value.
-     * @return Closure.
-     */
-    private <N> IgniteClosure<N, N> wrapperClosure(final N val) {
-        return new IgniteClosure<N, N>() {
-            @Override public N apply(N e) {
-                return val;
-            }
-        };
-    }
-
-    /**
-     * Method returns callable for execution {@link #set(Object,Object)}} operation in async and sync mode.
-     *
-     * @param val Value will be set in the atomic stamped.
-     * @param stamp Stamp will be set in the atomic stamped.
-     * @return Callable for execution in async and sync mode.
-     */
-    private Callable<Boolean> internalSet(final T val, final S stamp) {
-        return retryTopologySafe(new Callable<Boolean>() {
-            @Override public Boolean call() throws Exception {
-                try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
-                    GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
-
-                    if (stmp == null)
-                        throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
-
-                    stmp.set(val, stamp);
-
-                    atomicView.put(key, stmp);
-
-                    tx.commit();
-
-                    return true;
-                }
-                catch (Error | Exception e) {
-                    U.error(log, "Failed to set [val=" + val + ", stamp=" + stamp + ", atomicStamped=" + this + ']', e);
-
-                    throw e;
-                }
-            }
-        });
-    }
-
-    /**
-     * Conditionally asynchronously sets the new value and new stamp. They will be set if
-     * {@code expValPred} and {@code expStampPred} both evaluate to {@code true}.
-     *
-     * @param expValPred Predicate which should evaluate to {@code true} for value to be set
-     * @param newValClos Closure generates new value.
-     * @param expStampPred Predicate which should evaluate to {@code true} for value to be set
-     * @param newStampClos Closure generates new stamp value.
-     * @return Callable for execution in async and sync mode.
-     */
-    private Callable<Boolean> internalCompareAndSet(final IgnitePredicate<T> expValPred,
-        final IgniteClosure<T, T> newValClos, final IgnitePredicate<S> expStampPred,
-        final IgniteClosure<S, S> newStampClos) {
-        return retryTopologySafe(new Callable<Boolean>() {
-            @Override public Boolean call() throws Exception {
-                try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
-                    GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
-
-                    if (stmp == null)
-                        throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
-
-                    if (!(expValPred.apply(stmp.value()) && expStampPred.apply(stmp.stamp()))) {
-                        tx.setRollbackOnly();
-
-                        return false;
-                    }
-                    else {
-                        stmp.set(newValClos.apply(stmp.value()), newStampClos.apply(stmp.stamp()));
-
-                        atomicView.getAndPut(key, stmp);
-
-                        tx.commit();
-
-                        return true;
-                    }
-                }
-                catch (Error | Exception e) {
-                    U.error(log, "Failed to compare and set [expValPred=" + expValPred + ", newValClos=" +
-                        newValClos + ", expStampPred=" + expStampPred + ", newStampClos=" + newStampClos +
-                        ", atomicStamped=" + this + ']', e);
-
-                    throw e;
-                }
-            }
-        });
-    }
-
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeObject(ctx.kernalContext());
@@ -418,6 +305,104 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
         return new IllegalStateException("Atomic stamped was removed from cache: " + name);
     }
 
+    /**
+     *
+     */
+    static class StampedSetEntryProcessor<T, S> implements
+        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>, Void> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final T newVal;
+
+        /** */
+        private final S newStamp;
+
+        /**
+         * @param newVal New value.
+         * @param newStamp New stamp value.
+         */
+        StampedSetEntryProcessor(T newVal, S newStamp) {
+            this.newVal = newVal;
+            this.newStamp = newStamp;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void process(MutableEntry<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>> e,
+            Object... args) {
+            GridCacheAtomicStampedValue val = e.getValue();
+
+            if (val == null)
+                throw new EntryProcessorException("Failed to find atomic stamped with given name: " + e.getKey().name());
+
+            e.setValue(new GridCacheAtomicStampedValue<>(newVal, newStamp));
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return GridToStringBuilder.toString(StampedSetEntryProcessor.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class StampedCompareAndSetEntryProcessor<T, S> implements
+        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>, Boolean> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final T expVal;
+
+        /** */
+        private final S expStamp;
+
+        /** */
+        private final T newVal;
+
+        /** */
+        private final S newStamp;
+
+        /**
+         * @param expVal Expected value.
+         * @param expStamp Expected stamp.
+         * @param newVal New value.
+         * @param newStamp New stamp value.
+         */
+        StampedCompareAndSetEntryProcessor(T expVal, S expStamp, T newVal, S newStamp) {
+            this.expVal = expVal;
+            this.expStamp = expStamp;
+            this.newVal = newVal;
+            this.newStamp = newStamp;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Boolean process(MutableEntry<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>> e,
+            Object... args) {
+            GridCacheAtomicStampedValue val = e.getValue();
+
+            if (val == null)
+                throw new EntryProcessorException("Failed to find atomic stamped with given name: " + e.getKey().name());
+
+            if (F.eq(expVal, val.value()) && F.eq(expStamp, val.stamp())) {
+                e.setValue(new GridCacheAtomicStampedValue<>(newVal, newStamp));
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return GridToStringBuilder.toString(StampedCompareAndSetEntryProcessor.class, this);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return GridToStringBuilder.toString(GridCacheAtomicStampedImpl.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index ea80cc5..86e99a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -152,7 +152,9 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
     /** {@inheritDoc} */
     @Override public int count() {
         try {
-            return CU.outTx(new GetCountCallable(), ctx);
+            GridCacheCountDownLatchValue latchVal = latchView.get(key);
+
+            return latchVal == null ? 0 : latchVal.get();
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -208,7 +210,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
         A.ensure(val > 0, "val should be positive");
 
         try {
-            return CU.outTx(retryTopologySafe(new CountDownCallable(val)), ctx);
+            return retryTopologySafe(new CountDownCallable(val));
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -218,7 +220,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
     /** {@inheritDoc}*/
     @Override public void countDownAll() {
         try {
-            CU.outTx(retryTopologySafe(new CountDownCallable(0)), ctx);
+            retryTopologySafe(new CountDownCallable(0));
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -255,23 +257,22 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
             int state = initGuard.get();
 
             if (state != READY_LATCH_STATE) {
-                /** Internal latch is not fully initialized yet. Remember latest latch value. */
+                /* Internal latch is not fully initialized yet. Remember latest latch value. */
                 lastLatchVal = cnt;
 
                 return;
             }
 
-            /** 'synchronized' statement guarantees visibility of internalLatch. No need to make it volatile. */
+            /* 'synchronized' statement guarantees visibility of internalLatch. No need to make it volatile. */
             latch0 = internalLatch;
         }
 
-        /** Internal latch is fully initialized and ready for the usage. */
+        /* Internal latch is fully initialized and ready for the usage. */
 
         assert latch0 != null;
 
         while (latch0.getCount() > cnt)
             latch0.countDown();
-
     }
 
     /**
@@ -280,27 +281,24 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
     private void initializeLatch() throws IgniteCheckedException {
         if (initGuard.compareAndSet(UNINITIALIZED_LATCH_STATE, CREATING_LATCH_STATE)) {
             try {
-                internalLatch = CU.outTx(
-                    retryTopologySafe(new Callable<CountDownLatch>() {
-                        @Override public CountDownLatch call() throws Exception {
-                            try (GridNearTxLocal tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
-                                GridCacheCountDownLatchValue val = latchView.get(key);
+                internalLatch = retryTopologySafe(new Callable<CountDownLatch>() {
+                    @Override public CountDownLatch call() throws Exception {
+                        try (GridNearTxLocal tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
+                            GridCacheCountDownLatchValue val = latchView.get(key);
 
-                                if (val == null) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Failed to find count down latch with given name: " + name);
+                            if (val == null) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to find count down latch with given name: " + name);
 
-                                    return new CountDownLatch(0);
-                                }
+                                return new CountDownLatch(0);
+                            }
 
-                                tx.commit();
+                            tx.commit();
 
-                                return new CountDownLatch(val.get());
-                            }
+                            return new CountDownLatch(val.get());
                         }
-                    }),
-                    ctx
-                );
+                    }
+                });
 
                 synchronized (initGuard) {
                     if (lastLatchVal != null) {
@@ -392,18 +390,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
     /**
      *
      */
-    private class GetCountCallable implements Callable<Integer> {
-        /** {@inheritDoc} */
-        @Override public Integer call() throws Exception {
-            GridCacheCountDownLatchValue latchVal = latchView.get(key);
-
-            return latchVal == null ? 0 : latchVal.get();
-        }
-    }
-
-    /**
-     *
-     */
     private class CountDownCallable implements Callable<Integer> {
         /** Value to count down on (if 0 then latch is counted down to 0). */
         private final int val;


Mime
View raw message