ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [06/34] ignite git commit: IGNITE-1678 code + test+ benchmark config
Date Tue, 14 Feb 2017 13:11:25 GMT
IGNITE-1678  code + test+ benchmark config


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/784958bf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/784958bf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/784958bf

Branch: refs/heads/ignite-3727-2
Commit: 784958bf6e0e8e81191af498f6a993b2bfd78204
Parents: cf4cd05
Author: DmitriyGovorukhin <dgovorukhin@gridgain.com>
Authored: Tue Sep 6 19:35:14 2016 +0300
Committer: DmitriyGovorukhin <dgovorukhin@gridgain.com>
Committed: Tue Sep 6 19:35:14 2016 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/IgniteAtomicSequence.java |  15 +
 .../configuration/AtomicConfiguration.java      |  25 +
 .../datastructures/DataStructuresProcessor.java | 111 ++--
 .../GridCacheAtomicSequenceImpl.java            | 384 +++++-------
 ...AtomicSequenceMultiThreadedAbstractTest.java | 579 +++++++++++++++++++
 .../GridCacheSequenceApiSelfAbstractTest.java   | 101 +++-
 ...titionedAtomicSequenceMultiThreadedTest.java | 313 +---------
 ...GridCachePartitionedSequenceApiSelfTest.java |   4 +
 ...plicatedAtomicSequenceMultiThreadedTest.java |  33 ++
 .../GridCacheReplicatedSequenceApiSelfTest.java |   4 +
 .../IgniteCacheDataStructuresSelfTestSuite.java |   2 +
 .../cache/IgniteAtomicSequenceBenchmark.java    |  45 ++
 12 files changed, 991 insertions(+), 625 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/784958bf/modules/core/src/main/java/org/apache/ignite/IgniteAtomicSequence.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicSequence.java b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicSequence.java
index a1e1392..aa1cbdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicSequence.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicSequence.java
@@ -118,6 +118,21 @@ public interface IgniteAtomicSequence extends Closeable {
     public void batchSize(int size);
 
     /**
+     * Gets local reserve percentage for this atomic sequence. When a reserve percentage of a batch size
+     * is reached when sequence starts a new reservation in background.
+     *
+     * @return Sequence reserve pecentage.
+     */
+    public int reservePercentage();
+
+    /**
+     * Sets local reserve percentage for this atomic sequence.
+     *
+     * @param percentage Reserve pecentage. Must be between 0 and 100.
+     */
+    public void reservePercentage(int percentage);
+
+    /**
      * Gets status of atomic sequence.
      *
      * @return {@code true} if atomic sequence was removed from cache, {@code false} otherwise.

http://git-wip-us.apache.org/repos/asf/ignite/blob/784958bf/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java
index 6649b5e..ad96b73 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java
@@ -36,6 +36,9 @@ public class AtomicConfiguration {
     /** Default atomic sequence reservation size. */
     public static final int DFLT_ATOMIC_SEQUENCE_RESERVE_SIZE = 1000;
 
+    /** Default atomic sequence reservation size. */
+    public static final int DFLT_ATOMIC_SEQUENCE_RESERVE_PERCENTAGE = 80;
+
     /** Default batch size for all cache's sequences. */
     private int seqReserveSize = DFLT_ATOMIC_SEQUENCE_RESERVE_SIZE;
 
@@ -45,6 +48,9 @@ public class AtomicConfiguration {
     /** Number of backups. */
     private int backups = DFLT_BACKUPS;
 
+    /** Atomic sequence reservation percentage. */
+    private int atomicSeqReservePercentage = DFLT_ATOMIC_SEQUENCE_RESERVE_PERCENTAGE;
+
     /**
      * @return Number of backup nodes.
      */
@@ -98,6 +104,25 @@ public class AtomicConfiguration {
         this.seqReserveSize = seqReserveSize;
     }
 
+    /**
+     * Gets reserve percentage for configuration. When a reserve percentage of a batch size
+     * is reached when sequence starts a new reservation in background.
+     *
+     * @return Atomic sequence reservation percentage.
+     */
+    public int getAtomicSequenceReservePercentage() {
+        return atomicSeqReservePercentage;
+    }
+
+    /**
+     * Sets reserve percentage for configuration.
+     * *
+     * @param atomicSeqReservePercentage Atomic sequence reservation percentage.
+     */
+    public void setAtomicSequenceReservePercentage(int atomicSeqReservePercentage) {
+        this.atomicSeqReservePercentage = atomicSeqReservePercentage;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(AtomicConfiguration.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/784958bf/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 1cad22f..eb81ca2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -36,6 +36,7 @@ import javax.cache.event.EventType;
 import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.EntryProcessorException;
 import javax.cache.processor.MutableEntry;
+
 import org.apache.ignite.IgniteAtomicLong;
 import org.apache.ignite.IgniteAtomicReference;
 import org.apache.ignite.IgniteAtomicSequence;
@@ -357,9 +358,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                     if (seqVal == null && !create)
                         return null;
 
-                    // We should use offset because we already reserved left side of range.
+        /*            // We should use offset because we already reserved left side of range.
                     long off = atomicCfg.getAtomicSequenceReserveSize() > 1 ?
-                        atomicCfg.getAtomicSequenceReserveSize() - 1 : 1;
+                        atomicCfg.getAtomicSequenceReserveSize() - 1 : 1;*/
 
                     long upBound;
                     long locCntr;
@@ -367,18 +368,16 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                     if (seqVal == null) {
                         locCntr = initVal;
 
-                        upBound = locCntr + off;
+                        upBound = locCntr + atomicCfg.getAtomicSequenceReserveSize();
 
-                        // Global counter must be more than reserved region.
-                        seqVal = new GridCacheAtomicSequenceValue(upBound + 1);
+                        seqVal = new GridCacheAtomicSequenceValue(upBound);
                     }
                     else {
                         locCntr = seqVal.get();
 
-                        upBound = locCntr + off;
+                        upBound = locCntr + atomicCfg.getAtomicSequenceReserveSize();
 
-                        // Global counter must be more than reserved region.
-                        seqVal.set(upBound + 1);
+                        seqVal.set(upBound);
                     }
 
                     // Update global counter.
@@ -390,6 +389,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                         seqView,
                         dsCacheCtx,
                         atomicCfg.getAtomicSequenceReserveSize(),
+                        atomicCfg.getAtomicSequenceReservePercentage(),
                         locCntr,
                         upBound);
 
@@ -448,8 +448,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
      * Gets an atomic long from cache or creates one if it's not cached.
      *
      * @param name Name of atomic long.
-     * @param initVal Initial value for atomic long. If atomic long already cached, {@code initVal}
-     *        will be ignored.
+     * @param initVal Initial value for atomic long. If atomic long already cached, {@code initVal} will be ignored.
      * @param create If {@code true} atomic long will be created in case it is not in cache.
      * @return Atomic long.
      * @throws IgniteCheckedException If loading failed.
@@ -526,8 +525,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         final DataStructureInfo dsInfo,
         final boolean create,
         Class<? extends T> cls)
-        throws IgniteCheckedException
-    {
+        throws IgniteCheckedException {
         Map<String, DataStructureInfo> dsMap = utilityCache.get(DATA_STRUCTURES_KEY);
 
         if (!create && (dsMap == null || !dsMap.containsKey(dsInfo.name)))
@@ -607,8 +605,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         String name,
         DataStructureType type,
         @Nullable final IgniteInClosureX<T> afterRmv)
-        throws IgniteCheckedException
-    {
+        throws IgniteCheckedException {
         Map<String, DataStructureInfo> dsMap = utilityCache.get(DATA_STRUCTURES_KEY);
 
         if (dsMap == null || !dsMap.containsKey(name))
@@ -656,8 +653,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
      * Gets an atomic reference from cache or creates one if it's not cached.
      *
      * @param name Name of atomic reference.
-     * @param initVal Initial value for atomic reference. If atomic reference already cached, {@code initVal}
-     *        will be ignored.
+     * @param initVal Initial value for atomic reference. If atomic reference already cached, {@code initVal} will be
+     * ignored.
      * @param create If {@code true} atomic reference will be created in case it is not in cache.
      * @return Atomic reference.
      * @throws IgniteCheckedException If loading failed.
@@ -666,8 +663,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     public final <T> IgniteAtomicReference<T> atomicReference(final String name,
         final T initVal,
         final boolean create)
-        throws IgniteCheckedException
-    {
+        throws IgniteCheckedException {
         A.notNull(name, "name");
 
         awaitInitialization();
@@ -761,10 +757,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
      * Gets an atomic stamped from cache or creates one if it's not cached.
      *
      * @param name Name of atomic stamped.
-     * @param initVal Initial value for atomic stamped. If atomic stamped already cached, {@code initVal}
-     *        will be ignored.
-     * @param initStamp Initial stamp for atomic stamped. If atomic stamped already cached, {@code initStamp}
-     *        will be ignored.
+     * @param initVal Initial value for atomic stamped. If atomic stamped already cached, {@code initVal} will be
+     * ignored.
+     * @param initStamp Initial stamp for atomic stamped. If atomic stamped already cached, {@code initStamp} will be
+     * ignored.
      * @param create If {@code true} atomic stamped will be created in case it is not in cache.
      * @return Atomic stamped.
      * @throws IgniteCheckedException If loading failed.
@@ -1005,8 +1001,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     @Nullable private <T> T getCollection(final IgniteClosureX<GridCacheContext, T> c,
         final DataStructureInfo dsInfo,
         boolean create)
-        throws IgniteCheckedException
-    {
+        throws IgniteCheckedException {
         awaitInitialization();
 
         Map<String, DataStructureInfo> dsMap = utilityCache.get(DATA_STRUCTURES_KEY);
@@ -1083,8 +1078,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     @Nullable private static IgniteCheckedException validateDataStructure(
         @Nullable Map<String, DataStructureInfo> dsMap,
         DataStructureInfo info,
-        boolean create)
-    {
+        boolean create) {
         if (dsMap == null)
             return null;
 
@@ -1102,20 +1096,17 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
      *
      * @param name Name of the latch.
      * @param cnt Initial count.
-     * @param autoDel {@code True} to automatically delete latch from cache when
-     *      its count reaches zero.
-     * @param create If {@code true} latch will be created in case it is not in cache,
-     *      if it is {@code false} all parameters except {@code name} are ignored.
-     * @return Count down latch for the given name or {@code null} if it is not found and
-     *      {@code create} is false.
+     * @param autoDel {@code True} to automatically delete latch from cache when its count reaches zero.
+     * @param create If {@code true} latch will be created in case it is not in cache, if it is {@code false} all
+     * parameters except {@code name} are ignored.
+     * @return Count down latch for the given name or {@code null} if it is not found and {@code create} is false.
      * @throws IgniteCheckedException If operation failed.
      */
     public IgniteCountDownLatch countDownLatch(final String name,
         final int cnt,
         final boolean autoDel,
         final boolean create)
-        throws IgniteCheckedException
-    {
+        throws IgniteCheckedException {
         A.notNull(name, "name");
 
         awaitInitialization();
@@ -1201,12 +1192,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                 try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
                     // Check correctness type of removable object.
                     GridCacheCountDownLatchValue val =
-                            cast(dsView.get(key), GridCacheCountDownLatchValue.class);
+                        cast(dsView.get(key), GridCacheCountDownLatchValue.class);
 
                     if (val != null) {
                         if (val.get() > 0) {
                             throw new IgniteCheckedException("Failed to remove count down latch " +
-                                    "with non-zero count: " + val.get());
+                                "with non-zero count: " + val.get());
                         }
 
                         dsView.remove(key);
@@ -1232,10 +1223,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
      * @param name Name of the semaphore.
      * @param cnt Initial count.
      * @param failoverSafe {@code True} FailoverSafe parameter.
-     * @param create If {@code true} semaphore will be created in case it is not in cache,
-     *      if it is {@code false} all parameters except {@code name} are ignored.
-     * @return Semaphore for the given name or {@code null} if it is not found and
-     *      {@code create} is false.
+     * @param create If {@code true} semaphore will be created in case it is not in cache, if it is {@code false} all
+     * parameters except {@code name} are ignored.
+     * @return Semaphore for the given name or {@code null} if it is not found and {@code create} is false.
      * @throws IgniteCheckedException If operation failed.
      */
     public IgniteSemaphore semaphore(final String name, final int cnt, final boolean failoverSafe, final boolean create)
@@ -1351,11 +1341,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
      * @param failoverSafe Flag indicating behaviour in case of failure.
      * @param fair Flag indicating fairness policy of this lock.
      * @param create If {@code true} reentrant lock will be created in case it is not in cache.
-     * @return ReentrantLock for the given name or {@code null} if it is not found and
-     *      {@code create} is false.
+     * @return ReentrantLock for the given name or {@code null} if it is not found and {@code create} is false.
      * @throws IgniteCheckedException If operation failed.
      */
-    public IgniteLock reentrantLock(final String name, final boolean failoverSafe, final boolean fair, final boolean create)
+    public IgniteLock reentrantLock(final String name, final boolean failoverSafe, final boolean fair,
+        final boolean create)
         throws IgniteCheckedException {
         A.notNull(name, "name");
 
@@ -1532,8 +1522,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         /** {@inheritDoc} */
         @Override public void onUpdated(
             Iterable<CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal>> evts)
-            throws CacheEntryListenerException
-        {
+            throws CacheEntryListenerException {
             for (CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal> evt : evts) {
                 if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED) {
                     GridCacheInternal val0 = evt.getValue();
@@ -1603,8 +1592,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                         }
                         else if (sem != null) {
                             U.error(log, "Failed to cast object " +
-                                    "[expected=" + IgniteSemaphore.class.getSimpleName() +
-                                    ", actual=" + sem.getClass() + ", value=" + sem + ']');
+                                "[expected=" + IgniteSemaphore.class.getSimpleName() +
+                                ", actual=" + sem.getClass() + ", value=" + sem + ']');
                         }
                     }
                     else if (val0 instanceof GridCacheLockState) {
@@ -1761,6 +1750,18 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         if (atomicCfg == null)
             throw new IgniteException("Atomic data structure can not be created, " +
                 "need to provide IgniteAtomicConfiguration.");
+
+        if (atomicCfg.getAtomicSequenceReserveSize() <= 0)
+            throw new IgniteException(
+                "Atomic sequence can not be created, " +
+                    "reserve size must be more than 0, but atomicSequenceReserveSize: " + atomicCfg.getAtomicSequenceReserveSize()
+            );
+
+        if (atomicCfg.getAtomicSequenceReservePercentage() > 100)
+            throw new IgniteException(
+                "Atomic sequence can not be created, reserve percentage must have value " +
+                    "between 0 and 100, but atomicSequenceReservePercentage: " + atomicCfg.getAtomicSequenceReservePercentage()
+            );
     }
 
     /**
@@ -1779,7 +1780,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                 col.cfg.getBackups() == cfg.getBackups() &&
                 col.cfg.getOffHeapMaxMemory() == cfg.getOffHeapMaxMemory() &&
                 ((col.cfg.getNodeFilter() == null && cfg.getNodeFilter() == null) ||
-                (col.cfg.getNodeFilter() != null && col.cfg.getNodeFilter().equals(cfg.getNodeFilter()))))
+                    (col.cfg.getNodeFilter() != null && col.cfg.getNodeFilter().equals(cfg.getNodeFilter()))))
                 return col.cacheName;
         }
 
@@ -1788,8 +1789,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
     /**
      * @param c Closure to run.
-     * @throws IgniteCheckedException If failed.
      * @return Closure return value.
+     * @throws IgniteCheckedException If failed.
      */
     private static <T> T retryTopologySafe(IgniteOutClosureX<T> c) throws IgniteCheckedException {
         for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) {
@@ -2144,8 +2145,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         @Override public IgniteCheckedException process(
             MutableEntry<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>> entry,
             Object... args)
-            throws EntryProcessorException
-        {
+            throws EntryProcessorException {
             Map<String, DataStructureInfo> map = entry.getValue();
 
             if (map == null) {
@@ -2223,8 +2223,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         /** {@inheritDoc} */
         @Override public T2<String, IgniteCheckedException> process(
             MutableEntry<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>> entry,
-            Object... args)
-        {
+            Object... args) {
             Map<String, DataStructureInfo> map = entry.getValue();
 
             CollectionInfo colInfo = (CollectionInfo)info.info;
@@ -2303,8 +2302,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         /** {@inheritDoc} */
         @Override public String process(
             MutableEntry<CacheDataStructuresCacheKey, List<CacheCollectionInfo>> entry,
-            Object... args)
-        {
+            Object... args) {
             List<CacheCollectionInfo> list = entry.getValue();
 
             if (list == null) {
@@ -2380,8 +2378,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         /** {@inheritDoc} */
         @Override public T2<Boolean, IgniteCheckedException> process(
             MutableEntry<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>> entry,
-            Object... args)
-        {
+            Object... args) {
             Map<String, DataStructureInfo> map = entry.getValue();
 
             if (map == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/784958bf/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 7474f46..77bbb41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -17,36 +17,21 @@
 
 package org.apache.ignite.internal.processors.datastructures;
 
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.InvalidObjectException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.io.ObjectStreamException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.jetbrains.annotations.Nullable;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.ignite.internal.util.typedef.internal.CU.retryTopologySafe;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+
+import java.io.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+import static org.apache.ignite.internal.util.typedef.internal.CU.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
 
 /**
  * Cache sequence implementation.
@@ -87,26 +72,32 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
     /** Local value of sequence. */
     private long locVal;
 
-    /**  Upper bound of local counter. */
-    private long upBound;
+    /** Upper bound of local counter. */
+    private long upBound; // TODO should be not included
 
-    /**  Sequence batch size */
-    private volatile int batchSize;
+    /** Reserved bottom bound of local counter (included). */
+    private long reservedBottomBound;
 
-    /** Synchronization lock. */
-    private final Lock lock = new ReentrantLock();
+    /** Reserved upper bound of local counter (not included). */
+    private long reservedUpBound;
+
+    /** A limit after which a new reservation should be done. */
+    private long newReservationLine;
 
-    /** Await condition. */
-    private Condition cond = lock.newCondition();
+    /** Whether reserveFuture already processed or not. */
+    private boolean isReserveFutResultsProcessed = true;
 
-    /** Callable for execution {@link #incrementAndGet} operation in async and sync mode.  */
-    private final Callable<Long> incAndGetCall = internalUpdate(1, true);
+    /** default 80% */
+    private volatile int percentage;
 
-    /** Callable for execution {@link #getAndIncrement} operation in async and sync mode.  */
-    private final Callable<Long> getAndIncCall = internalUpdate(1, false);
+    /** Sequence batch size */
+    private volatile int batchSize;
+
+    /** Synchronization lock. */
+    private final Lock lock = new ReentrantLock();
 
-    /** Add and get cache call guard. */
-    private final AtomicBoolean updateGuard = new AtomicBoolean();
+    /** Reservation future. */
+    private IgniteInternalFuture<?> reservationFut = new GridFinishedFuture<>();
 
     /**
      * Empty constructor required by {@link Externalizable}.
@@ -131,13 +122,15 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
         IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicSequenceValue> seqView,
         GridCacheContext ctx,
         int batchSize,
+        int percentage,
         long locVal,
-        long upBound)
-    {
+        long upBound) {
         assert key != null;
         assert seqView != null;
         assert ctx != null;
+        assert batchSize > 0 : "BatchSize: " + batchSize;
         assert locVal <= upBound;
+        assert percentage >= 0 && percentage <= 100 : "Percentage: " + percentage;
 
         this.batchSize = batchSize;
         this.ctx = ctx;
@@ -146,6 +139,9 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
         this.upBound = upBound;
         this.locVal = locVal;
         this.name = name;
+        this.percentage = percentage;
+
+        newReservationLine = locVal + (batchSize * percentage / 100);
 
         log = ctx.logger(getClass());
     }
@@ -172,7 +168,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
     /** {@inheritDoc} */
     @Override public long incrementAndGet() {
         try {
-            return internalUpdate(1, incAndGetCall, true);
+            return internalUpdate(1, true);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -182,7 +178,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
     /** {@inheritDoc} */
     @Override public long getAndIncrement() {
         try {
-            return internalUpdate(1, getAndIncCall, false);
+            return internalUpdate(1, false);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -194,7 +190,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
         A.ensure(l > 0, " Parameter mustn't be less then 1: " + l);
 
         try {
-            return internalUpdate(l, null, true);
+            return internalUpdate(l, true);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -206,7 +202,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
         A.ensure(l > 0, " Parameter mustn't be less then 1: " + l);
 
         try {
-            return internalUpdate(l, null, false);
+            return internalUpdate(l, false);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -217,162 +213,134 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
      * Synchronous sequence update operation. Will add given amount to the sequence value.
      *
      * @param l Increment amount.
-     * @param updateCall Cache call that will update sequence reservation count in accordance with l.
      * @param updated If {@code true}, will return sequence value after update, otherwise will return sequence value
-     *      prior to update.
+     * prior to update.
      * @return Sequence value.
      * @throws IgniteCheckedException If update failed.
      */
     @SuppressWarnings("SignalWithoutCorrespondingAwait")
-    private long internalUpdate(long l, @Nullable Callable<Long> updateCall, boolean updated) throws IgniteCheckedException {
-        checkRemoved();
-
+    private long internalUpdate(final long l, final boolean updated) throws IgniteCheckedException {
         assert l > 0;
 
-        lock.lock();
+        while (true) {
+            checkRemoved();
 
-        try {
-            // If reserved range isn't exhausted.
-            if (locVal + l <= upBound) {
-                long curVal = locVal;
+            lock.lock(); // TODO locks here?
 
-                locVal += l;
+            try {
+                if (locVal + l >= newReservationLine && isReserveFutResultsProcessed && reservationFut.isDone())
+                    reservationFut = runAsyncReservation(0);
 
-                return updated ? locVal : curVal;
-            }
-        }
-        finally {
-            lock.unlock();
-        }
+                // If reserved range isn't exhausted.
+                if (locVal + l < upBound) {
+                    long curVal = locVal;
 
-        if (updateCall == null)
-            updateCall = internalUpdate(l, updated);
+                    locVal += l;
 
-        while (true) {
-            if (updateGuard.compareAndSet(false, true)) {
-                try {
-                    // This call must be outside lock.
-                    return CU.outTx(updateCall, ctx);
+                    return updated ? locVal : curVal;
                 }
-                finally {
-                    lock.lock();
 
-                    try {
-                        updateGuard.set(false);
+                if (!isReserveFutResultsProcessed && reservationFut.isDone()) {
+                    isReserveFutResultsProcessed = true;
 
-                        cond.signalAll();
-                    }
-                    finally {
-                        lock.unlock();
-                    }
-                }
-            }
-            else {
-                lock.lock();
+                    if (locVal + l < reservedUpBound) {
+                        long curVal = locVal;
 
-                try {
-                    while (locVal >= upBound && updateGuard.get())
-                        U.await(cond, 500, MILLISECONDS);
+                        locVal = (locVal + l < reservedBottomBound) ? reservedBottomBound : locVal + l;
 
-                    checkRemoved();
+                        upBound = reservedUpBound;
 
-                    // If reserved range isn't exhausted.
-                    if (locVal + l <= upBound) {
-                        long curVal = locVal;
+                        return updated ? locVal : curVal;
+                    }
+                    else {
+                        long diff = locVal + l - reservedUpBound;
 
-                        locVal += l;
+                        long off = (diff / batchSize) * batchSize;
 
-                        return updated ? locVal : curVal;
+                        reservationFut = runAsyncReservation(off);
                     }
                 }
-                finally {
-                    lock.unlock();
-                }
             }
+            finally {
+                lock.unlock();
+            }
+
+            // If reserved range is exhausted.
+            reservationFut.get();
         }
     }
 
     /**
-     * Asynchronous sequence update operation. Will add given amount to the sequence value.
+     * Runs async reservation of new range for current node.
      *
-     * @param l Increment amount.
-     * @param updateCall Cache call that will update sequence reservation count in accordance with l.
-     * @param updated If {@code true}, will return sequence value after update, otherwise will return sequence value
-     *      prior to update.
-     * @return Future indicating sequence value.
-     * @throws IgniteCheckedException If update failed.
+     * @param off Offset.
+     * @return Future.
      */
-    @SuppressWarnings("SignalWithoutCorrespondingAwait")
-    private IgniteInternalFuture<Long> internalUpdateAsync(long l, @Nullable Callable<Long> updateCall, boolean updated)
-        throws IgniteCheckedException {
-        checkRemoved();
+    private IgniteInternalFuture<?> runAsyncReservation(final long off) {
+        assert off >= 0 : "Offset: " + off;
 
-        A.ensure(l > 0, " Parameter mustn't be less then 1: " + l);
+        return ctx.kernalContext().closure().runLocalSafe(new Runnable() {
+            @Override public void run() {
+                Callable<Void> reserveCall = retryTopologySafe(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        try (IgniteInternalTx tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) {
+                            GridCacheAtomicSequenceValue seq = seqView.get(key);
 
-        lock.lock();
+                            checkRemoved();
 
-        try {
-            // If reserved range isn't exhausted.
-            if (locVal + l <= upBound) {
-                long curVal = locVal;
+                            assert seq != null;
 
-                locVal += l;
+                            long newUpBound = -1;
 
-                return new GridFinishedFuture<>(updated ? locVal : curVal);
-            }
-        }
-        finally {
-            lock.unlock();
-        }
+                            lock.lock();
 
-        if (updateCall == null)
-            updateCall = internalUpdate(l, updated);
+                            try {
+                                assert isReserveFutResultsProcessed;
 
-        while (true) {
-            if (updateGuard.compareAndSet(false, true)) {
-                try {
-                    // This call must be outside lock.
-                    return ctx.closures().callLocalSafe(updateCall, true);
-                }
-                finally {
-                    lock.lock();
+                                isReserveFutResultsProcessed = false;
 
-                    try {
-                        updateGuard.set(false);
+                                long curGlobalVal = seq.get();
 
-                        cond.signalAll();
-                    }
-                    finally {
-                        lock.unlock();
-                    }
-                }
-            }
-            else {
-                lock.lock();
+                                reservedBottomBound = curGlobalVal + off;
 
-                try {
-                    while (locVal >= upBound && updateGuard.get())
-                        U.await(cond, 500, MILLISECONDS);
+                                newUpBound = reservedBottomBound + batchSize;
 
-                    checkRemoved();
+                                reservedUpBound = newUpBound;
 
-                    // If reserved range isn't exhausted.
-                    if (locVal + l <= upBound) {
-                        long curVal = locVal;
+                                newReservationLine = reservedBottomBound + (batchSize * percentage / 100);
+                            }
+                            finally {
+                                lock.unlock();
+                            }
+
+                            seq.set(newUpBound);
 
-                        locVal += l;
+                            seqView.put(key, seq);
 
-                        return new GridFinishedFuture<>(updated ? locVal : curVal);
+                            tx.commit();
+                        }
+                        catch (Error | Exception e) {
+                            U.error(log, "Failed to get and add: " + this, e);
+
+                            throw e;
+                        }
+
+                        return null;
                     }
+                });
+
+                try {
+                    CU.outTx(reserveCall, ctx);
                 }
-                finally {
-                    lock.unlock();
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
                 }
             }
-        }
+        }, /*sys pool*/ false);
     }
 
-    /** Get local batch size for this sequences.
+    /**
+     * Get local batch size for this sequences.
      *
      * @return Sequence batch size.
      */
@@ -398,6 +366,25 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public int reservePercentage() {
+        return percentage;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void reservePercentage(int percentage) {
+        A.ensure(percentage >= 0 && percentage <= 100, "Invalid reserve percentage: " + percentage);
+
+        lock.lock();
+
+        try {
+            this.percentage = percentage;
+        }
+        finally {
+            lock.unlock();
+        }
+    }
+
     /**
      * Check removed status.
      *
@@ -465,89 +452,6 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
         }
     }
 
-    /**
-     * Method returns callable for execution all update operations in async and sync mode.
-     *
-     * @param l Value will be added to sequence.
-     * @param updated If {@code true}, will return updated value, if {@code false}, will return previous value.
-     * @return Callable for execution in async and sync mode.
-     */
-    @SuppressWarnings("TooBroadScope")
-    private Callable<Long> internalUpdate(final long l, final boolean updated) {
-        return retryTopologySafe(new Callable<Long>() {
-            @Override public Long call() throws Exception {
-                try (IgniteInternalTx tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) {
-                    GridCacheAtomicSequenceValue seq = seqView.get(key);
-
-                    checkRemoved();
-
-                    assert seq != null;
-
-                    long curLocVal;
-
-                    long newUpBound;
-
-                    lock.lock();
-
-                    try {
-                        curLocVal = locVal;
-
-                        // If local range was already reserved in another thread.
-                        if (locVal + l <= upBound) {
-                            long retVal = locVal;
-
-                            locVal += l;
-
-                            return updated ? locVal : retVal;
-                        }
-
-                        long curGlobalVal = seq.get();
-
-                        long newLocVal;
-
-                        /* We should use offset because we already reserved left side of range.*/
-                        long off = batchSize > 1 ? batchSize - 1 : 1;
-
-                        // Calculate new values for local counter, global counter and upper bound.
-                        if (curLocVal + l >= curGlobalVal) {
-                            newLocVal = curLocVal + l;
-
-                            newUpBound = newLocVal + off;
-                        }
-                        else {
-                            newLocVal = curGlobalVal;
-
-                            newUpBound = newLocVal + off;
-                        }
-
-                        locVal = newLocVal;
-                        upBound = newUpBound;
-
-                        if (updated)
-                            curLocVal = newLocVal;
-                    }
-                    finally {
-                        lock.unlock();
-                    }
-
-                    // Global counter must be more than reserved upper bound.
-                    seq.set(newUpBound + 1);
-
-                    seqView.put(key, seq);
-
-                    tx.commit();
-
-                    return curLocVal;
-                }
-                catch (Error | Exception e) {
-                    U.error(log, "Failed to get and add: " + this, e);
-
-                    throw e;
-                }
-            }
-        });
-    }
-
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeObject(ctx.kernalContext());

http://git-wip-us.apache.org/repos/asf/ignite/blob/784958bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicSequenceMultiThreadedAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicSequenceMultiThreadedAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicSequenceMultiThreadedAbstractTest.java
new file mode 100644
index 0000000..10b57e8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicSequenceMultiThreadedAbstractTest.java
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.datastructures;
+
+import java.util.Random;
+import java.util.UUID;
+import org.apache.ignite.IgniteAtomicSequence;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.AtomicConfiguration;
+import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicSequenceImpl;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Cache partitioned multi-threaded tests.
+ */
+public abstract class GridCacheAtomicSequenceMultiThreadedAbstractTest extends IgniteAtomicsAbstractTest {
+    /** Number of threads for multithreaded test. */
+    private static final int THREAD_NUM = 30;
+
+    /** Number of iterations per thread for multithreaded test. */
+    private static final int ITERATION_NUM = 10000;
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected AtomicConfiguration atomicConfiguration() {
+        AtomicConfiguration cfg = super.atomicConfiguration();
+
+        cfg.setBackups(1);
+        cfg.setAtomicSequenceReserveSize(10);
+
+        return cfg;
+    }
+
+    /** @throws Exception If failed. */
+    public void testValues() throws Exception {
+        String seqName = UUID.randomUUID().toString();
+
+        final GridCacheAtomicSequenceImpl seq = (GridCacheAtomicSequenceImpl)grid(0).atomicSequence(seqName, 0, true);
+
+        // Local reservations.
+        assertEquals(1, seq.incrementAndGet());
+        assertEquals(1, seq.getAndIncrement()); // Seq = 2
+        assertEquals(3L, seq.incrementAndGet());
+        assertEquals(3L, seq.getAndIncrement()); // Seq=4
+
+        assertEquals(4, seq.getAndAdd(3));
+        assertEquals(9, seq.addAndGet(2));
+
+        assertEquals(new Long(9L), U.field(seq, "locVal"));
+        assertEquals(new Long(10L), U.field(seq, "upBound"));
+
+        // Cache calls.
+        assertEquals(10, seq.incrementAndGet());
+
+        assertEquals(new Long(10L), U.field(seq, "locVal"));
+        assertEquals(new Long(20L), U.field(seq, "upBound"));
+
+        seq.addAndGet(9);
+
+        assertEquals(new Long(19L), U.field(seq, "locVal"));
+        assertEquals(new Long(20L), U.field(seq, "upBound"));
+
+        assertEquals(20L, seq.incrementAndGet());
+
+        assertEquals(new Long(20L), U.field(seq, "locVal"));
+        assertEquals(new Long(30L), U.field(seq, "upBound"));
+
+        seq.addAndGet(9);
+
+        assertEquals(new Long(29L), U.field(seq, "locVal"));
+        assertEquals(new Long(30L), U.field(seq, "upBound"));
+
+        assertEquals(29, seq.getAndIncrement());
+
+        assertEquals(new Long(30L), U.field(seq, "locVal"));
+        assertEquals(new Long(40L), U.field(seq, "upBound"));
+
+        seq.addAndGet(9);
+
+        assertEquals(new Long(39L), U.field(seq, "locVal"));
+        assertEquals(new Long(40L), U.field(seq, "upBound"));
+
+        assertEquals(39L, seq.getAndIncrement());
+
+        assertEquals(new Long(40L), U.field(seq, "locVal"));
+        assertEquals(new Long(50L), U.field(seq, "upBound"));
+
+        seq.addAndGet(9);
+
+        assertEquals(new Long(49L), U.field(seq, "locVal"));
+        assertEquals(new Long(50L), U.field(seq, "upBound"));
+
+        assertEquals(50, seq.addAndGet(1));
+
+        assertEquals(new Long(50L), U.field(seq, "locVal"));
+        assertEquals(new Long(60L), U.field(seq, "upBound"));
+
+        seq.addAndGet(9);
+
+        assertEquals(new Long(59L), U.field(seq, "locVal"));
+        assertEquals(new Long(60L), U.field(seq, "upBound"));
+
+        assertEquals(59, seq.getAndAdd(1));
+
+        assertEquals(new Long(60L), U.field(seq, "locVal"));
+        assertEquals(new Long(70L), U.field(seq, "upBound"));
+    }
+
+    /** @throws Exception If failed. */
+    public void testValues2() throws Exception {
+        String seqName = UUID.randomUUID().toString();
+
+        final GridCacheAtomicSequenceImpl seq = (GridCacheAtomicSequenceImpl)grid(0).atomicSequence(seqName, 10, true);
+
+        assertSeqFields(seq, /*locVal*/ 10, /*upBound*/ 20, /*resBound*/ 18, /*resBottomBound*/ 0, /*resUpBound*/ 0);
+
+        assertEquals(17, seq.addAndGet(7));
+
+        assertSeqFields(seq, /*locVal*/ 17, /*upBound*/ 20, /*resBound*/ 18, /*resBottomBound*/ 0, /*resUpBound*/ 0);
+
+        assertEquals(18, seq.incrementAndGet());
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return !F.eq(U.field(seq, "isReserveFutResultsProcessed"), true);
+            }
+        }, 1000);
+
+        assertSeqFields(seq, /*locVal*/ 18, /*upBound*/ 20, /*resBound*/ 28, /*resBottomBound*/ 20, /*resUpBound*/ 30);
+
+        assertEquals(19, seq.incrementAndGet());
+
+        assertSeqFields(seq, /*locVal*/ 19, /*upBound*/ 20, /*resBound*/ 28, /*resBottomBound*/ 20, /*resUpBound*/ 30);
+
+        assertEquals(20, seq.incrementAndGet());
+
+        assertSeqFields(seq, /*locVal*/ 20, /*upBound*/ 30, /*resBound*/ 28, /*resBottomBound*/ 20, /*resUpBound*/ 30);
+    }
+
+    /** @throws Exception If failed. */
+    public void testValuesPercentage50() throws Exception {
+        String seqName = UUID.randomUUID().toString();
+
+        final GridCacheAtomicSequenceImpl seq = (GridCacheAtomicSequenceImpl)grid(0).atomicSequence(seqName, 0, true);
+
+        seq.reservePercentage(50);
+
+        assertSeqFields(seq, /*locVal*/ 0, /*upBound*/ 10, /*resBound*/ 8, /*resBottomBound*/ 0, /*resUpBound*/ 0);
+
+        // Exhaust a first reserved range to get recalculated values according to new reserve percentage.
+        assertEquals(10, seq.addAndGet(10));
+
+        assertSeqFields(seq, /*locVal*/ 10, /*upBound*/ 20, /*resBound*/ 15, /*resBottomBound*/ 10, /*resUpBound*/ 20);
+
+        assertEquals(15, seq.addAndGet(5));
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return !F.eq(U.field(seq, "isReserveFutResultsProcessed"), true);
+            }
+        }, 1000);
+
+        assertSeqFields(seq, /*locVal*/ 15, /*upBound*/ 20, /*resBound*/ 25, /*resBottomBound*/ 20, /*resUpBound*/ 30);
+    }
+
+    /** @throws Exception If failed. */
+    public void testValuesPercentage0() throws Exception {
+        String seqName = UUID.randomUUID().toString();
+
+        final GridCacheAtomicSequenceImpl seq = (GridCacheAtomicSequenceImpl)grid(0).atomicSequence(seqName, 0, true);
+
+        seq.reservePercentage(0);
+
+        assertSeqFields(seq, /*locVal*/ 0, /*upBound*/ 10, /*resBound*/ 8, /*resBottomBound*/ 0, /*resUpBound*/ 0);
+
+        // Exhaust a first reserved range to get recalculated values according to new reserve percentage.
+        assertEquals(10, seq.addAndGet(10));
+
+        assertSeqFields(seq, /*locVal*/ 10, /*upBound*/ 20, /*resBound*/ 10, /*resBottomBound*/ 10, /*resUpBound*/ 20);
+
+        assertEquals(11, seq.addAndGet(1));
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return !F.eq(U.field(seq, "isReserveFutResultsProcessed"), true);
+            }
+        }, 1000);
+
+        assertSeqFields(seq, /*locVal*/ 11, /*upBound*/ 20, /*resBound*/ 20, /*resBottomBound*/ 20, /*resUpBound*/ 30);
+
+        assertEquals(12, seq.incrementAndGet());
+
+        assertSeqFields(seq, /*locVal*/ 12, /*upBound*/ 20, /*resBound*/ 20, /*resBottomBound*/ 20, /*resUpBound*/ 30);
+
+        assertEquals(20, seq.addAndGet(8));
+
+        assertSeqFields(seq, /*locVal*/ 20, /*upBound*/ 30, /*resBound*/ 20, /*resBottomBound*/ 20, /*resUpBound*/ 30);
+    }
+
+    /** @throws Exception If failed. */
+    public void testValuesPercentage100() throws Exception {
+        String seqName = UUID.randomUUID().toString();
+
+        final GridCacheAtomicSequenceImpl seq = (GridCacheAtomicSequenceImpl)grid(0).atomicSequence(seqName, 0, true);
+
+        seq.reservePercentage(100);
+
+        assertSeqFields(seq, /*locVal*/ 0, /*upBound*/ 10, /*resBound*/ 8, /*resBottomBound*/ 0, /*resUpBound*/ 0);
+
+        // Exhaust a first reserved range to get recalculated values according to new reserve percentage.
+        assertEquals(10, seq.addAndGet(10));
+
+        assertSeqFields(seq, /*locVal*/ 10, /*upBound*/ 20, /*resBound*/ 20, /*resBottomBound*/ 10, /*resUpBound*/ 20);
+
+        assertEquals(19, seq.addAndGet(9));
+
+        assertSeqFields(seq, /*locVal*/ 19, /*upBound*/ 20, /*resBound*/ 20, /*resBottomBound*/ 10, /*resUpBound*/ 20);
+
+        assertEquals(20, seq.incrementAndGet());
+
+        assertSeqFields(seq, /*locVal*/ 20, /*upBound*/ 30, /*resBound*/ 30, /*resBottomBound*/ 20, /*resUpBound*/ 30);
+    }
+
+    /** @throws Exception If failed. */
+    public void testValuesDoubleReservation() throws Exception {
+        String seqName = UUID.randomUUID().toString();
+
+        final GridCacheAtomicSequenceImpl seq = (GridCacheAtomicSequenceImpl)grid(0).atomicSequence(seqName, 0, true);
+
+        assertSeqFields(seq, /*locVal*/ 0, /*upBound*/ 10, /*resBound*/ 8, /*resBottomBound*/ 0, /*resUpBound*/ 0);
+
+        assertEquals(30, seq.addAndGet(30));
+    }
+
+    /** @throws Exception If failed. */
+    public void testValues2Nodes() throws Exception {
+        String seqName = UUID.randomUUID().toString();
+
+        startGrid(1);
+
+        try {
+            final GridCacheAtomicSequenceImpl seq1 = (GridCacheAtomicSequenceImpl)grid(0).atomicSequence(seqName, 0, true);
+            final GridCacheAtomicSequenceImpl seq2 = (GridCacheAtomicSequenceImpl)grid(1).atomicSequence(seqName, 0, false);
+
+            assertSeqFields(seq1, /*locVal*/ 0, /*upBound*/ 10, /*resBound*/ 8, /*resBottomBound*/ 0, /*resUpBound*/ 0);
+            assertSeqFields(seq2, /*locVal*/ 10, /*upBound*/ 20, /*resBound*/ 18, /*resBottomBound*/ 0, /*resUpBound*/ 0);
+
+            assertEquals(1, seq1.incrementAndGet());
+            assertEquals(11, seq2.incrementAndGet());
+
+            assertSeqFields(seq1, /*locVal*/ 1, /*upBound*/ 10, /*resBound*/ 8, /*resBottomBound*/ 0, /*resUpBound*/ 0);
+            assertSeqFields(seq2, /*locVal*/ 11, /*upBound*/ 20, /*resBound*/ 18, /*resBottomBound*/ 0, /*resUpBound*/ 0);
+
+            assertEquals(7, seq1.addAndGet(6));
+            assertEquals(17, seq2.addAndGet(6));
+
+            assertSeqFields(seq1, /*locVal*/ 7, /*upBound*/ 10, /*resBound*/ 8, /*resBottomBound*/ 0, /*resUpBound*/ 0);
+            assertSeqFields(seq2, /*locVal*/ 17, /*upBound*/ 20, /*resBound*/ 18, /*resBottomBound*/ 0, /*resUpBound*/ 0);
+
+            // New reservation (reverse order)
+            assertEquals(18, seq2.incrementAndGet());
+
+            assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return !F.eq(U.field(seq2, "isReserveFutResultsProcessed"), true);
+                }
+            }, 1000));
+
+            assertEquals(8, seq1.incrementAndGet());
+
+            assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return !F.eq(U.field(seq1, "isReserveFutResultsProcessed"), true);
+                }
+            }, 1000));
+
+            assertSeqFields(seq1, /*locVal*/ 8, /*upBound*/ 10, /*resBound*/ 38, /*resBottomBound*/ 30, /*resUpBound*/ 40);
+            assertSeqFields(seq2, /*locVal*/ 18, /*upBound*/ 20, /*resBound*/ 28, /*resBottomBound*/ 20, /*resUpBound*/ 30);
+
+            assertEquals(30, seq1.addAndGet(7));
+            assertEquals(20, seq2.addAndGet(2));
+
+            assertSeqFields(seq1, /*locVal*/ 30, /*upBound*/ 40, /*resBound*/ 38, /*resBottomBound*/ 30, /*resUpBound*/ 40);
+            assertSeqFields(seq2, /*locVal*/ 20, /*upBound*/ 30, /*resBound*/ 28, /*resBottomBound*/ 20, /*resUpBound*/ 30);
+        }
+        finally {
+            stopGrid(1);
+        }
+    }
+
+    /**
+     * @param seq Sequence.
+     * @param locVal Local value.
+     * @param upBound Up bound.
+     * @param newReservationLine Reservation bnound.
+     * @param reservedBottomBound Reservation bottom bound.
+     * @param reservedUpBound Reservation up bound.
+     */
+    private void assertSeqFields(GridCacheAtomicSequenceImpl seq, long locVal, long upBound, long newReservationLine,
+        long reservedBottomBound, long reservedUpBound) {
+        assertEquals(new Long(locVal), U.field(seq, "locVal"));
+        assertEquals(new Long(upBound), U.field(seq, "upBound"));
+        assertEquals(new Long(newReservationLine), U.field(seq, "newReservationLine"));
+        assertEquals(new Long(reservedBottomBound), U.field(seq, "reservedBottomBound"));
+        assertEquals(new Long(reservedUpBound), U.field(seq, "reservedUpBound"));
+    }
+
+    /** @throws Exception If failed. */
+    public void testValues2NodesDoubleReservation() throws Exception {
+        String seqName = UUID.randomUUID().toString();
+
+        startGrid(1);
+
+        try {
+            final GridCacheAtomicSequenceImpl seq1 = (GridCacheAtomicSequenceImpl)grid(0).atomicSequence(seqName, 0, true);
+            final GridCacheAtomicSequenceImpl seq2 = (GridCacheAtomicSequenceImpl)grid(1).atomicSequence(seqName, 0, false);
+
+            assertEquals(1, seq1.incrementAndGet());
+            assertEquals(11, seq2.incrementAndGet());
+
+            assertEquals(new Long(1L), U.field(seq1, "locVal"));
+            assertEquals(new Long(10L), U.field(seq1, "upBound"));
+            assertEquals(new Long(11L), U.field(seq2, "locVal"));
+            assertEquals(new Long(20L), U.field(seq2, "upBound"));
+
+            assertEquals(31, seq2.addAndGet(20));
+
+            assertEquals(new Long(1L), U.field(seq1, "locVal"));
+            assertEquals(new Long(10L), U.field(seq1, "upBound"));
+            assertEquals(new Long(31L), U.field(seq2, "locVal"));
+            assertEquals(new Long(40L), U.field(seq2, "upBound"));
+
+            // Jump
+            assertEquals(40, seq1.addAndGet(23));
+        }
+        finally {
+            stopGrid(1);
+        }
+    }
+
+    /** @throws Exception If failed. */
+    public void testUpdatedSync() throws Exception {
+        checkUpdate(true);
+    }
+
+    /** @throws Exception If failed. */
+    public void testPreviousSync() throws Exception {
+        checkUpdate(false);
+    }
+
+    /** @throws Exception If failed. */
+    public void testIncrementAndGet() throws Exception {
+        // Random sequence names.
+        String seqName = UUID.randomUUID().toString();
+
+        final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true);
+
+        runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() {
+            @Override public void apply(IgniteAtomicSequence t) {
+                t.incrementAndGet();
+            }
+        }, seq, ITERATION_NUM, THREAD_NUM);
+
+        assertEquals(ITERATION_NUM * THREAD_NUM, seq.get());
+    }
+
+    /** @throws Exception If failed. */
+    public void testIncrementAndGet2Nodes() throws Exception {
+        startGrid(1);
+
+        try {
+            String seqName = UUID.randomUUID().toString();
+
+            final IgniteAtomicSequence seq1 = grid(0).atomicSequence(seqName, 0L, true);
+            final IgniteAtomicSequence seq2 = grid(1).atomicSequence(seqName, 0L, true);
+
+            multithreaded(new Runnable() {
+                @Override public void run() {
+                    for (int i = 0; i < ITERATION_NUM; i++) {
+                        if (i % 2 == 0)
+                            seq1.incrementAndGet();
+                        else
+                            seq2.incrementAndGet();
+                    }
+                }
+            }, THREAD_NUM);
+
+            long seq1Val = seq1.get();
+            long seq2Val = seq2.get();
+
+            assertEquals(ITERATION_NUM * THREAD_NUM + (seq1Val < seq2Val ? 0 : 10), seq1Val);
+            assertEquals(ITERATION_NUM * THREAD_NUM + (seq1Val < seq2Val ? 10 : 0), seq2Val);
+        }
+        finally {
+            stopGrid(1);
+        }
+    }
+
+    /** @throws Exception If failed. */
+    public void testGetAndIncrement() throws Exception {
+        // Random sequence names.
+        String seqName = UUID.randomUUID().toString();
+
+        final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true);
+
+        runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() {
+            @Override public void apply(IgniteAtomicSequence t) {
+                t.getAndIncrement();
+            }
+        }, seq, ITERATION_NUM, THREAD_NUM);
+
+        assertEquals(ITERATION_NUM * THREAD_NUM, seq.get());
+    }
+
+    /** @throws Exception If failed. */
+    public void testAddAndGet() throws Exception {
+        // Random sequence names.
+        String seqName = UUID.randomUUID().toString();
+
+        final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true);
+
+        runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() {
+            @Override public void apply(IgniteAtomicSequence t) {
+                t.addAndGet(5);
+            }
+        }, seq, ITERATION_NUM, THREAD_NUM);
+
+        assertEquals(5 * ITERATION_NUM * THREAD_NUM, seq.get());
+    }
+
+    /** @throws Exception If failed. */
+    public void testGetAndAdd() throws Exception {
+        checkGetAndAdd(5);
+    }
+
+    /** @throws Exception If failed. */
+    public void testGetAndAdd2() throws Exception {
+        checkGetAndAdd(3);
+    }
+
+    /**
+     * @param val Value.
+     * @throws Exception If failed.
+     */
+    private void checkGetAndAdd(final int val) throws Exception {
+        // Random sequence names.
+        String seqName = UUID.randomUUID().toString();
+
+        final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true);
+
+        runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() {
+            @Override public void apply(IgniteAtomicSequence t) {
+                t.getAndAdd(val);
+            }
+        }, seq, ITERATION_NUM, THREAD_NUM);
+
+        assertEquals(val * ITERATION_NUM * THREAD_NUM, seq.get());
+    }
+
+    /** @throws Exception If failed. */
+    public void testMixed1() throws Exception {
+        // Random sequence names.
+        String seqName = UUID.randomUUID().toString();
+
+        final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true);
+
+        runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() {
+            @Override public void apply(IgniteAtomicSequence t) {
+                t.incrementAndGet();
+                t.getAndIncrement();
+                t.incrementAndGet();
+                t.getAndIncrement();
+                t.getAndAdd(3);
+                t.addAndGet(3);
+            }
+        }, seq, ITERATION_NUM, THREAD_NUM);
+
+        assertEquals(10 * ITERATION_NUM * THREAD_NUM, seq.get());
+    }
+
+    /** @throws Exception If failed. */
+    public void testMixed2() throws Exception {
+        // Random sequence names.
+        String seqName = UUID.randomUUID().toString();
+
+        final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true);
+
+        runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() {
+            @Override public void apply(IgniteAtomicSequence t) {
+                t.getAndAdd(2);
+                t.addAndGet(3);
+                t.addAndGet(5);
+                t.getAndAdd(7);
+            }
+        }, seq, ITERATION_NUM, THREAD_NUM);
+
+        assertEquals(17 * ITERATION_NUM * THREAD_NUM, seq.get());
+    }
+
+    /**
+     * Executes given closure in a given number of threads given number of times.
+     *
+     * @param c Closure to execute.
+     * @param seq Sequence to pass into closure.
+     * @param cnt Count of iterations per thread.
+     * @param threadCnt Thread count.
+     * @throws Exception If failed.
+     */
+    protected void runSequenceClosure(final GridInUnsafeClosure<IgniteAtomicSequence> c,
+        final IgniteAtomicSequence seq, final int cnt, final int threadCnt) throws Exception {
+        multithreaded(new Runnable() {
+            @Override public void run() {
+                try {
+                    for (int i = 0; i < cnt; i++)
+                        c.apply(seq);
+                }
+                catch (IgniteCheckedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }, threadCnt);
+    }
+
+    /**
+     * @param updated Whether use updated values.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("IfMayBeConditional")
+    private void checkUpdate(boolean updated) throws Exception {
+        String seqName = UUID.randomUUID().toString();
+
+        final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true);
+
+        long curVal = 0;
+
+        Random r = new Random();
+
+        for (int i = 0; i < ITERATION_NUM; i++) {
+            long delta = r.nextInt(10) + 1;
+
+            long retVal = updated ? seq.addAndGet(delta) : seq.getAndAdd(delta);
+
+            assertEquals(updated ? curVal + delta : curVal, retVal);
+
+            curVal += delta;
+        }
+    }
+
+    /**
+     * Closure that throws exception.
+     *
+     * @param <E> Closure argument type.
+     */
+    private abstract static class GridInUnsafeClosure<E> {
+        public abstract void apply(E p) throws IgniteCheckedException;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/784958bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java
index d988b2c..e7a477f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java
@@ -18,12 +18,12 @@
 package org.apache.ignite.internal.processors.cache.datastructures;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+
 import org.apache.ignite.IgniteAtomicSequence;
 import org.apache.ignite.configuration.AtomicConfiguration;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.datastructures.GridCacheInternalKey
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.transactions.Transaction;
+import org.eclipse.jetty.util.ConcurrentHashSet;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -119,7 +120,7 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends IgniteAtomics
             assertEquals(BATCH_SIZE, seq.batchSize());
         }
 
-        assertEquals(1, G.allGrids().size());
+        assertEquals(gridCount(), G.allGrids().size());
     }
 
     /** {@inheritDoc} */
@@ -308,9 +309,29 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends IgniteAtomics
      * @throws Exception If failed.
      */
     public void testMultiThreadedSequenceIntegrity() throws Exception {
-        multiThreadedSequenceIntegrity(1, 0);
-        multiThreadedSequenceIntegrity(7, -1500);
-        multiThreadedSequenceIntegrity(3, 345);
+        multiThreadedSequenceIntegrity(/*batchSize*/ 1, /*percentage*/ 30, /*initVal*/0);
+        multiThreadedSequenceIntegrity(/*batchSize*/ 7, /*percentage*/ 0, /*initVal*/-1500);
+        multiThreadedSequenceIntegrity(/*batchSize*/ 3, /*percentage*/ 100, /*initVal*/345);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultiNodeSequenceIntegrity() throws Exception {
+        if (gridCount() < 2)
+            return;
+
+        multiNodeSequenceIntegrity(/*batchSize*/ 1, /*percentage*/ 80, /*initVal*/0);
+        multiNodeSequenceIntegrity(/*batchSize*/ 1, /*percentage*/ 0, /*initVal*/-11);
+        multiNodeSequenceIntegrity(/*batchSize*/ 1, /*percentage*/ 100, /*initVal*/183);
+
+        multiNodeSequenceIntegrity(/*batchSize*/ 7, /*percentage*/ 20, /*initVal*/83);
+        multiNodeSequenceIntegrity(/*batchSize*/ 7, /*percentage*/ 0, /*initVal*/-17);
+        multiNodeSequenceIntegrity(/*batchSize*/ 7, /*percentage*/ 100, /*initVal*/11);
+
+        multiNodeSequenceIntegrity(/*batchSize*/ 11, /*percentage*/ 50, /*initVal*/-7);
+        multiNodeSequenceIntegrity(/*batchSize*/ 11, /*percentage*/ 0, /*initVal*/55);
+        multiNodeSequenceIntegrity(/*batchSize*/ 11, /*percentage*/ 100, /*initVal*/22);
     }
 
     /**
@@ -364,8 +385,8 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends IgniteAtomics
      * Sequence get and increment.
      *
      * @param seq Sequence for test.
-     * @throws Exception If failed.
      * @return Result of operation.
+     * @throws Exception If failed.
      */
     private long getAndIncrement(IgniteAtomicSequence seq) throws Exception {
         long locSeqVal = seq.get();
@@ -381,8 +402,8 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends IgniteAtomics
      * Sequence add and increment
      *
      * @param seq Sequence for test.
-     * @throws Exception If failed.
      * @return Result of operation.
+     * @throws Exception If failed.
      */
     private long incrementAndGet(IgniteAtomicSequence seq) throws Exception {
         long locSeqVal = seq.get();
@@ -399,8 +420,8 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends IgniteAtomics
      *
      * @param seq Sequence for test.
      * @param l Number of added elements.
-     * @throws Exception If failed.
      * @return Result of operation.
+     * @throws Exception If failed.
      */
     private long addAndGet(IgniteAtomicSequence seq, long l) throws Exception {
         long locSeqVal = seq.get();
@@ -417,8 +438,8 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends IgniteAtomics
      *
      * @param seq Sequence for test.
      * @param l Number of added elements.
-     * @throws Exception If failed.
      * @return Result of operation.
+     * @throws Exception If failed.
      */
     private long getAndAdd(IgniteAtomicSequence seq, long l) throws Exception {
         long locSeqVal = seq.get();
@@ -431,10 +452,10 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends IgniteAtomics
     }
 
     /**
-     *  Sequence integrity.
+     * Sequence integrity.
      *
      * @param batchSize Sequence batch size.
-     * @param initVal  Sequence initial value.
+     * @param initVal Sequence initial value.
      * @throws Exception If test fail.
      */
     private void sequenceIntegrity(int batchSize, long initVal) throws Exception {
@@ -465,24 +486,24 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends IgniteAtomics
     }
 
     /**
-     *  Multi-threaded integrity.
+     * Multi-threaded integrity.
      *
      * @param batchSize Sequence batch size.
-     * @param initVal  Sequence initial value.
+     * @param initVal Sequence initial value.
      * @throws Exception If test fail.
      */
-    private void multiThreadedSequenceIntegrity(int batchSize, long initVal) throws Exception {
+    private void multiThreadedSequenceIntegrity(int batchSize, int percentage, long initVal) throws Exception {
         // Random sequence names.
         String locSeqName = UUID.randomUUID().toString();
 
         // Sequence.
-        final IgniteAtomicSequence locSeq = grid().atomicSequence(locSeqName, initVal,
-            true);
+        final IgniteAtomicSequence locSeq = grid().atomicSequence(locSeqName, initVal, true);
 
         locSeq.batchSize(batchSize);
+        locSeq.reservePercentage(percentage);
 
         // Result set.
-        final Set<Long> resSet = Collections.synchronizedSet(new HashSet<Long>());
+        final Set<Long> resSet = new ConcurrentHashSet<>();
 
         // Get sequence value and try to put it result set.
         for (int i = 0; i < MAX_LOOPS_NUM; i++) {
@@ -521,7 +542,6 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends IgniteAtomics
 
             resSet.add(val);
 
-
             if (i % 100 == 0)
                 info("Finished iteration 2: " + i);
         }
@@ -539,6 +559,51 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends IgniteAtomics
     }
 
     /**
+     * Multi-threaded integrity.
+     *
+     * @param batchSize Sequence batch size.
+     * @param initVal Sequence initial value.
+     * @throws Exception If test fail.
+     */
+    private void multiNodeSequenceIntegrity(int batchSize, int percentage, long initVal) throws Exception {
+        // Random sequence names.
+        String locSeqName = UUID.randomUUID().toString();
+
+        // Sequences.
+        final IgniteAtomicSequence[] locSeqs = new IgniteAtomicSequence[3];
+
+        for (int i = 0; i < locSeqs.length; i++) {
+            locSeqs[i] = grid(i).atomicSequence(locSeqName, initVal, true);
+
+            locSeqs[i].batchSize(batchSize);
+
+            locSeqs[i].reservePercentage(percentage);
+        }
+
+        final Set<Long> resSet = new ConcurrentHashSet<>();
+
+        multithreaded(
+            new Callable() {
+                @Nullable @Override public Object call() throws Exception {
+                    // Get sequence value and try to put it result set.
+                    for (int i = 0; i < MAX_LOOPS_NUM; i++) {
+                        Long val = locSeqs[i % locSeqs.length].getAndIncrement();
+
+                        assert !resSet.contains(val) : "Element already in set : " + val;
+
+                        resSet.add(val);
+                    }
+
+                    return null;
+                }
+            }, THREAD_NUM);
+
+        assert resSet.size() == MAX_LOOPS_NUM * THREAD_NUM;
+
+        removeSequence(locSeqName);
+    }
+
+    /**
      * Test sequence integrity.
      *
      * @param seq Sequence for test.

http://git-wip-us.apache.org/repos/asf/ignite/blob/784958bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java
index 945650d..ac38fd5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java
@@ -17,324 +17,17 @@
 
 package org.apache.ignite.internal.processors.cache.datastructures.partitioned;
 
-import java.util.Random;
-import java.util.UUID;
-import org.apache.ignite.IgniteAtomicSequence;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.configuration.AtomicConfiguration;
-import org.apache.ignite.internal.processors.cache.datastructures.IgniteAtomicsAbstractTest;
-import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicSequenceImpl;
-import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.processors.cache.datastructures.GridCacheAtomicSequenceMultiThreadedAbstractTest;
 
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 
 /**
  * Cache partitioned multi-threaded tests.
  */
-public class GridCachePartitionedAtomicSequenceMultiThreadedTest extends IgniteAtomicsAbstractTest {
-    /** Number of threads for multithreaded test. */
-    private static final int THREAD_NUM = 30;
-
-    /** Number of iterations per thread for multithreaded test. */
-    private static final int ITERATION_NUM = 4000;
-
+public class GridCachePartitionedAtomicSequenceMultiThreadedTest extends GridCacheAtomicSequenceMultiThreadedAbstractTest {
     /** {@inheritDoc} */
     @Override protected CacheMode atomicsCacheMode() {
         return PARTITIONED;
     }
-
-    /** {@inheritDoc} */
-    @Override protected int gridCount() {
-        return 1;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected AtomicConfiguration atomicConfiguration() {
-        AtomicConfiguration cfg = super.atomicConfiguration();
-
-        cfg.setBackups(1);
-        cfg.setAtomicSequenceReserveSize(10);
-
-        return cfg;
-    }
-
-    /** @throws Exception If failed. */
-    public void testValues() throws Exception {
-        String seqName = UUID.randomUUID().toString();
-
-        final GridCacheAtomicSequenceImpl seq = (GridCacheAtomicSequenceImpl)grid(0).atomicSequence(seqName, 0, true);
-
-        // Local reservations.
-        assertEquals(1, seq.incrementAndGet());
-        assertEquals(1, seq.getAndIncrement()); // Seq = 2
-        assertEquals(3L, seq.incrementAndGet());
-        assertEquals(3L, seq.getAndIncrement()); // Seq=4
-
-        assertEquals(4, seq.getAndAdd(3));
-        assertEquals(9, seq.addAndGet(2));
-
-        assertEquals(new Long(9L), U.field(seq, "locVal"));
-        assertEquals(new Long(9L), U.field(seq, "upBound"));
-
-        // Cache calls.
-        assertEquals(10, seq.incrementAndGet());
-
-        assertEquals(new Long(10L), U.field(seq, "locVal"));
-        assertEquals(new Long(19L), U.field(seq, "upBound"));
-
-        seq.addAndGet(9);
-
-        assertEquals(new Long(19L), U.field(seq, "locVal"));
-        assertEquals(new Long(19L), U.field(seq, "upBound"));
-
-        assertEquals(20L, seq.incrementAndGet());
-
-        assertEquals(new Long(20L), U.field(seq, "locVal"));
-        assertEquals(new Long(29L), U.field(seq, "upBound"));
-
-        seq.addAndGet(9);
-
-        assertEquals(new Long(29L), U.field(seq, "locVal"));
-        assertEquals(new Long(29L), U.field(seq, "upBound"));
-
-        assertEquals(29, seq.getAndIncrement());
-
-        assertEquals(new Long(30L), U.field(seq, "locVal"));
-        assertEquals(new Long(39L), U.field(seq, "upBound"));
-
-        seq.addAndGet(9);
-
-        assertEquals(new Long(39L), U.field(seq, "locVal"));
-        assertEquals(new Long(39L), U.field(seq, "upBound"));
-
-        assertEquals(39L, seq.getAndIncrement());
-
-        assertEquals(new Long(40L), U.field(seq, "locVal"));
-        assertEquals(new Long(49L), U.field(seq, "upBound"));
-
-        seq.addAndGet(9);
-
-        assertEquals(new Long(49L), U.field(seq, "locVal"));
-        assertEquals(new Long(49L), U.field(seq, "upBound"));
-
-        assertEquals(50, seq.addAndGet(1));
-
-        assertEquals(new Long(50L), U.field(seq, "locVal"));
-        assertEquals(new Long(59L), U.field(seq, "upBound"));
-
-        seq.addAndGet(9);
-
-        assertEquals(new Long(59L), U.field(seq, "locVal"));
-        assertEquals(new Long(59L), U.field(seq, "upBound"));
-
-        assertEquals(59, seq.getAndAdd(1));
-
-        assertEquals(new Long(60L), U.field(seq, "locVal"));
-        assertEquals(new Long(69L), U.field(seq, "upBound"));
-    }
-
-    /** @throws Exception If failed. */
-    public void testUpdatedSync() throws Exception {
-        checkUpdate(true);
-    }
-
-    /** @throws Exception If failed. */
-    public void testPreviousSync() throws Exception {
-        checkUpdate(false);
-    }
-
-    /** @throws Exception If failed. */
-    public void testIncrementAndGet() throws Exception {
-        // Random sequence names.
-        String seqName = UUID.randomUUID().toString();
-
-        final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true);
-
-        runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() {
-            @Override public void apply(IgniteAtomicSequence t) {
-                t.incrementAndGet();
-            }
-        }, seq, ITERATION_NUM, THREAD_NUM);
-
-        assertEquals(ITERATION_NUM * THREAD_NUM, seq.get());
-    }
-
-    /** @throws Exception If failed. */
-    public void testIncrementAndGetAsync() throws Exception {
-        // Random sequence names.
-        String seqName = UUID.randomUUID().toString();
-
-        final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true);
-
-        runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() {
-            @Override public void apply(IgniteAtomicSequence t) {
-                t.incrementAndGet();
-            }
-        }, seq, ITERATION_NUM, THREAD_NUM);
-
-        assertEquals(ITERATION_NUM * THREAD_NUM, seq.get());
-    }
-
-    /** @throws Exception If failed. */
-    public void testGetAndIncrement() throws Exception {
-        // Random sequence names.
-        String seqName = UUID.randomUUID().toString();
-
-        final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true);
-
-        runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() {
-            @Override public void apply(IgniteAtomicSequence t) {
-                t.getAndIncrement();
-            }
-        }, seq, ITERATION_NUM, THREAD_NUM);
-
-        assertEquals(ITERATION_NUM * THREAD_NUM, seq.get());
-    }
-
-    /** @throws Exception If failed. */
-    public void testGetAndIncrementAsync() throws Exception {
-        // Random sequence names.
-        String seqName = UUID.randomUUID().toString();
-
-        final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true);
-
-        runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() {
-            @Override public void apply(IgniteAtomicSequence t) {
-                t.getAndIncrement();
-            }
-        }, seq, ITERATION_NUM, THREAD_NUM);
-
-        assertEquals(ITERATION_NUM * THREAD_NUM, seq.get());
-    }
-
-    /** @throws Exception If failed. */
-    public void testAddAndGet() throws Exception {
-        // Random sequence names.
-        String seqName = UUID.randomUUID().toString();
-
-        final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true);
-
-        runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() {
-            @Override public void apply(IgniteAtomicSequence t) {
-                t.addAndGet(5);
-            }
-        }, seq, ITERATION_NUM, THREAD_NUM);
-
-        assertEquals(5 * ITERATION_NUM * THREAD_NUM, seq.get());
-    }
-
-    /** @throws Exception If failed. */
-    public void testGetAndAdd() throws Exception {
-        // Random sequence names.
-        String seqName = UUID.randomUUID().toString();
-
-        final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true);
-
-        runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() {
-            @Override public void apply(IgniteAtomicSequence t) {
-                t.getAndAdd(5);
-            }
-        }, seq, ITERATION_NUM, THREAD_NUM);
-
-        assertEquals(5 * ITERATION_NUM * THREAD_NUM, seq.get());
-    }
-
-    /** @throws Exception If failed. */
-    public void testMixed1() throws Exception {
-        // Random sequence names.
-        String seqName = UUID.randomUUID().toString();
-
-        final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true);
-
-        runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() {
-            @Override public void apply(IgniteAtomicSequence t) {
-                t.incrementAndGet();
-                t.getAndIncrement();
-                t.incrementAndGet();
-                t.getAndIncrement();
-                t.getAndAdd(3);
-                t.addAndGet(3);
-            }
-        }, seq, ITERATION_NUM, THREAD_NUM);
-
-        assertEquals(10 * ITERATION_NUM * THREAD_NUM, seq.get());
-    }
-
-    /** @throws Exception If failed. */
-    public void testMixed2() throws Exception {
-        // Random sequence names.
-        String seqName = UUID.randomUUID().toString();
-
-        final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true);
-
-        runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() {
-            @Override public void apply(IgniteAtomicSequence t) {
-                t.getAndAdd(2);
-                t.addAndGet(3);
-                t.addAndGet(5);
-                t.getAndAdd(7);
-            }
-        }, seq, ITERATION_NUM, THREAD_NUM);
-
-        assertEquals(17 * ITERATION_NUM * THREAD_NUM, seq.get());
-    }
-
-    /**
-     * Executes given closure in a given number of threads given number of times.
-     *
-     * @param c Closure to execute.
-     * @param seq Sequence to pass into closure.
-     * @param cnt Count of iterations per thread.
-     * @param threadCnt Thread count.
-     * @throws Exception If failed.
-     */
-    protected void runSequenceClosure(final GridInUnsafeClosure<IgniteAtomicSequence> c,
-        final IgniteAtomicSequence seq, final int cnt, final int threadCnt) throws Exception {
-        multithreaded(new Runnable() {
-            @Override public void run() {
-                try {
-                    for (int i = 0; i < cnt; i++)
-                        c.apply(seq);
-                }
-                catch (IgniteCheckedException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        }, threadCnt);
-    }
-
-    /**
-     * @param updated Whether use updated values.
-     * @throws Exception If failed.
-     */
-    @SuppressWarnings("IfMayBeConditional")
-    private void checkUpdate(boolean updated) throws Exception {
-        String seqName = UUID.randomUUID().toString();
-
-        final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true);
-
-        long curVal = 0;
-
-        Random r = new Random();
-
-        for (int i = 0; i < ITERATION_NUM; i++) {
-            long delta = r.nextInt(10) + 1;
-
-            long retVal = updated ? seq.addAndGet(delta) : seq.getAndAdd(delta);
-
-            assertEquals(updated ? curVal + delta : curVal, retVal);
-
-            curVal += delta;
-        }
-    }
-
-    /**
-     * Closure that throws exception.
-     *
-     * @param <E> Closure argument type.
-     */
-    private abstract static class GridInUnsafeClosure<E> {
-        public abstract void apply(E p) throws IgniteCheckedException;
-    }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/784958bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedSequenceApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedSequenceApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedSequenceApiSelfTest.java
index adc2ab3..ae03348 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedSequenceApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedSequenceApiSelfTest.java
@@ -40,4 +40,8 @@ public class GridCachePartitionedSequenceApiSelfTest extends GridCacheSequenceAp
 
         return atomicCfg;
     }
+
+    @Override protected int gridCount() {
+        return 3;
+    }
 }
\ No newline at end of file


Mime
View raw message