ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [6/8] ignite git commit: IGNITE-4927 Write behind - add an option to skip write coalescing
Date Tue, 18 Apr 2017 15:09:46 GMT
IGNITE-4927 Write behind - add an option to skip write coalescing

Signed-off-by: nikolay_tikhonov <ntikhonov@gridgain.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/22580e19
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/22580e19
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/22580e19

Branch: refs/heads/ignite-1561-1
Commit: 22580e19b7ae5d11b8c299e2b3d92f5c8b9f0e8c
Parents: c4d8180
Author: Alexander Belyak <alexandr.belyak@xored.com>
Authored: Tue Apr 18 14:56:50 2017 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Tue Apr 18 15:57:45 2017 +0300

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       |  33 +
 .../cache/store/GridCacheWriteBehindStore.java  | 614 ++++++++++++++++---
 ...idCacheWriteBehindStoreAbstractSelfTest.java |  24 +-
 .../GridCacheWriteBehindStoreAbstractTest.java  |   4 +
 ...heWriteBehindStoreMultithreadedSelfTest.java |  88 ++-
 .../GridCacheWriteBehindStoreSelfTest.java      | 159 ++++-
 ...ClientWriteBehindStoreNonCoalescingTest.java | 175 ++++++
 .../IgniteCacheWriteBehindTestSuite.java        |   2 +
 8 files changed, 978 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index b5afba4..2308a10 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -176,6 +176,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     /** Default batch size for write-behind cache store. */
     public static final int DFLT_WRITE_BEHIND_BATCH_SIZE = 512;
 
+    /** Default write coalescing for write-behind cache store. */
+    public static final boolean DFLT_WRITE_BEHIND_COALESCING = true;
+
     /** Default maximum number of query iterators that can be stored. */
     public static final int DFLT_MAX_QUERY_ITERATOR_CNT = 1024;
 
@@ -310,6 +313,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     /** Maximum batch size for write-behind cache store. */
     private int writeBehindBatchSize = DFLT_WRITE_BEHIND_BATCH_SIZE;
 
+    /** Write coalescing flag for write-behind cache store */
+    private boolean writeBehindCoalescing = DFLT_WRITE_BEHIND_COALESCING;
+
     /** Maximum number of query iterators that can be stored. */
     private int maxQryIterCnt = DFLT_MAX_QUERY_ITERATOR_CNT;
 
@@ -454,6 +460,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         tmLookupClsName = cc.getTransactionManagerLookupClassName();
         topValidator = cc.getTopologyValidator();
         writeBehindBatchSize = cc.getWriteBehindBatchSize();
+        writeBehindCoalescing = cc.getWriteBehindCoalescing();
         writeBehindEnabled = cc.isWriteBehindEnabled();
         writeBehindFlushFreq = cc.getWriteBehindFlushFrequency();
         writeBehindFlushSize = cc.getWriteBehindFlushSize();
@@ -1287,6 +1294,32 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     }
 
     /**
+     * Write coalescing flag for write-behind cache store operations. Store operations (get or remove)
+     * with the same key are combined or coalesced to single, resulting operation
+     * to reduce pressure to underlying cache store.
+     * <p/>
+     * If not provided, default value is {@link #DFLT_WRITE_BEHIND_COALESCING}.
+     *
+     * @return Write coalescing flag.
+     */
+    public boolean getWriteBehindCoalescing() {
+        return writeBehindCoalescing;
+    }
+
+    /**
+     * Sets write coalescing flag for write-behind cache.
+     *
+     * @param writeBehindCoalescing Write coalescing flag.
+     * @see #getWriteBehindCoalescing()
+     * @return {@code this} for chaining.
+     */
+    public CacheConfiguration<K, V> setWriteBehindCoalescing(boolean writeBehindCoalescing) {
+        this.writeBehindCoalescing = writeBehindCoalescing;
+
+        return this;
+    }
+
+    /**
      * Use {@link IgniteConfiguration#getRebalanceThreadPoolSize()} instead.
      *
      * @return Size of rebalancing thread pool.

http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
index 91008ce..64238ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
@@ -17,17 +17,19 @@
 
 package org.apache.ignite.internal.processors.cache.store;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.LockSupport;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.cache.integration.CacheWriterException;
@@ -43,9 +45,11 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lifecycle.LifecycleAware;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentLinkedDeque8;
 import org.jsr166.ConcurrentLinkedHashMap;
 
 import static javax.cache.Cache.Entry;
@@ -65,6 +69,8 @@ import static javax.cache.Cache.Entry;
  * <p/>
  * Since write operations to the cache store are deferred, transaction support is lost; no
  * transaction objects are passed to the underlying store.
+ * <p/>
+ * {@link GridCacheWriteBehindStore} doesn't support concurrent modifications of the same key.
  */
 public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, LifecycleAware {
     /** Default write cache initial capacity. */
@@ -91,6 +97,9 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
     /** Count of worker threads performing underlying store updates. */
     private int flushThreadCnt = CacheConfiguration.DFLT_WRITE_FROM_BEHIND_FLUSH_THREAD_CNT;
 
+    /** Is flush threads count power of two flag. */
+    private boolean flushThreadCntIsPowerOfTwo;
+
     /** Cache flush frequency. All pending operations will be performed in not less then this value ms. */
     private long cacheFlushFreq = CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_FREQUENCY;
 
@@ -98,29 +107,26 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
     private int batchSize = CacheConfiguration.DFLT_WRITE_BEHIND_BATCH_SIZE;
 
     /** Ignite instance name. */
-    private String igniteInstanceName;
+    private final String igniteInstanceName;
 
     /** Cache name. */
-    private String cacheName;
+    private final String cacheName;
 
     /** Underlying store. */
-    private CacheStore<K, V> store;
+    private final CacheStore<K, V> store;
 
     /** Write cache. */
     private ConcurrentLinkedHashMap<K, StatefulValue<K, V>> writeCache;
 
     /** Flusher threads. */
-    private GridWorker[] flushThreads;
+    private Flusher[] flushThreads;
+
+    /** Write coalescing. */
+    private boolean writeCoalescing = CacheConfiguration.DFLT_WRITE_BEHIND_COALESCING;
 
     /** Atomic flag indicating store shutdown. */
     private AtomicBoolean stopping = new AtomicBoolean(true);
 
-    /** Flush lock. */
-    private Lock flushLock = new ReentrantLock();
-
-    /** Condition to determine records available for flush. */
-    private Condition canFlush = flushLock.newCondition();
-
     /** Variable for counting total cache overflows. */
     private AtomicInteger cacheTotalOverflowCntr = new AtomicInteger();
 
@@ -131,10 +137,16 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
     private AtomicInteger retryEntriesCnt = new AtomicInteger();
 
     /** Log. */
-    private IgniteLogger log;
+    private final IgniteLogger log;
 
     /** Store manager. */
-    private CacheStoreManager storeMgr;
+    private final CacheStoreManager storeMgr;
+
+    /** Flush lock. */
+    private final Lock flushLock = new ReentrantLock();
+
+    /** Condition to determine records available for flush. */
+    private Condition canFlush = flushLock.newCondition();
 
     /**
      * Creates a write-behind cache store for the given store.
@@ -193,7 +205,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
      * <p/>
      * If this value is {@code 0}, then flush is performed only on time-elapsing basis. However,
      * when this value is {@code 0}, the cache critical size is set to
-     * {@link CacheConfiguration#DFLT_WRITE_BEHIND_CRITICAL_SIZE}
+     * {@link CacheConfiguration#DFLT_WRITE_BEHIND_CRITICAL_SIZE}.
      *
      * @return Buffer size that triggers flush procedure.
      */
@@ -208,6 +220,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
      */
     public void setFlushThreadCount(int flushThreadCnt) {
         this.flushThreadCnt = flushThreadCnt;
+        this.flushThreadCntIsPowerOfTwo = (flushThreadCnt & (flushThreadCnt - 1)) == 0;
     }
 
     /**
@@ -220,6 +233,24 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
     }
 
     /**
+     * Sets the write coalescing flag.
+     *
+     * @param writeCoalescing Write coalescing flag.
+     */
+    public void setWriteCoalescing(boolean writeCoalescing) {
+        this.writeCoalescing = writeCoalescing;
+    }
+
+    /**
+     * Gets the write coalescing flag.
+     *
+     * @return Write coalescing flag.
+     */
+    public boolean getWriteCoalescing() {
+        return writeCoalescing;
+    }
+
+    /**
      * Sets the cache flush frequency. All pending operations on the underlying store will be performed
      * within time interval not less then this value.
      *
@@ -266,7 +297,16 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
      * @return Total count of entries in cache store internal buffer.
      */
     public int getWriteBehindBufferSize() {
-        return writeCache.sizex();
+        if (writeCoalescing)
+            return writeCache.sizex();
+        else {
+            int size = 0;
+
+            for (Flusher f : flushThreads)
+                size += f.size();
+
+            return size;
+        }
     }
 
     /**
@@ -292,14 +332,15 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
             if (cacheCriticalSize == 0)
                 cacheCriticalSize = CacheConfiguration.DFLT_WRITE_BEHIND_CRITICAL_SIZE;
 
-            flushThreads = new GridWorker[flushThreadCnt];
+            flushThreads = new GridCacheWriteBehindStore.Flusher[flushThreadCnt];
 
-            writeCache = new ConcurrentLinkedHashMap<>(initCap, 0.75f, concurLvl);
+            if (writeCoalescing)
+                writeCache = new ConcurrentLinkedHashMap<>(initCap, 0.75f, concurLvl);
 
             for (int i = 0; i < flushThreads.length; i++) {
                 flushThreads[i] = new Flusher(igniteInstanceName, "flusher-" + i, log);
 
-                new IgniteThread(flushThreads[i]).start();
+                flushThreads[i].start();
             }
         }
     }
@@ -344,7 +385,10 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
             if (log.isDebugEnabled())
                 log.debug("Stopping write-behind store for cache '" + cacheName + '\'');
 
-            wakeUp();
+            for (Flusher f : flushThreads) {
+                if (!f.isEmpty())
+                    f.wakeUp();
+            }
 
             boolean graceful = true;
 
@@ -352,7 +396,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
                 graceful &= U.join(worker, log);
 
             if (!graceful)
-                log.warning("Shutdown was aborted");
+                log.warning("Write behind store shutdown was aborted.");
         }
     }
 
@@ -361,7 +405,10 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
      * @throws IgniteCheckedException If failed.
      */
     public void forceFlush() throws IgniteCheckedException {
-        wakeUp();
+        for (Flusher f : flushThreads) {
+            if (!f.isEmpty())
+                f.wakeUp();
+        }
     }
 
     /** {@inheritDoc} */
@@ -376,10 +423,15 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
 
         Map<K, V> loaded = new HashMap<>();
 
-        Collection<K> remaining = new LinkedList<>();
+        Collection<K> remaining = null;
 
         for (K key : keys) {
-            StatefulValue<K, V> val = writeCache.get(key);
+            StatefulValue<K, V> val;
+
+            if (writeCoalescing)
+                val = writeCache.get(key);
+            else
+                val = flusher(key).flusherWriteMap.get(key);
 
             if (val != null) {
                 val.readLock().lock();
@@ -394,12 +446,16 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
                     val.readLock().unlock();
                 }
             }
-            else
+            else {
+                if (remaining == null)
+                    remaining = new ArrayList<>();
+
                 remaining.add(key);
+            }
         }
 
         // For items that were not found in queue.
-        if (!remaining.isEmpty()) {
+        if (remaining != null && !remaining.isEmpty()) {
             Map<K, V> loaded0 = store.loadAll(remaining);
 
             if (loaded0 != null)
@@ -414,7 +470,12 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
         if (log.isDebugEnabled())
             log.debug("Store load [key=" + key + ']');
 
-        StatefulValue<K, V> val = writeCache.get(key);
+        StatefulValue<K, V> val;
+
+        if (writeCoalescing)
+            val = writeCache.get(key);
+        else
+            val = flusher(key).flusherWriteMap.get(key);
 
         if (val != null) {
             val.readLock().lock();
@@ -493,7 +554,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
      *
      * @param key Key for which update is performed.
      * @param val New value, may be null for remove operation.
-     * @param operation Updated value status
+     * @param operation Updated value status.
      * @throws IgniteInterruptedCheckedException If interrupted while waiting for value to be flushed.
      */
     private void updateCache(K key,
@@ -502,8 +563,27 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
         throws IgniteInterruptedCheckedException {
         StatefulValue<K, V> newVal = new StatefulValue<>(val, operation);
 
+        if (writeCoalescing)
+            putToWriteCache(key, newVal);
+        else
+            flusher(key).putToFlusherWriteCache(key, newVal);
+    }
+
+    /**
+     * Performs flush-consistent writeCache update for the given key.
+     *
+     * @param key Key for which update is performed.
+     * @param newVal stateful value to put
+     * @throws IgniteInterruptedCheckedException If interrupted while waiting for value to be flushed.
+     */
+    private void putToWriteCache(
+        K key,
+        StatefulValue<K, V> newVal)
+        throws IgniteInterruptedCheckedException {
         StatefulValue<K, V> prev;
 
+        assert writeCoalescing : "Unexpected write coalescing.";
+
         while ((prev = writeCache.putIfAbsent(key, newVal)) != null) {
             prev.writeLock().lock();
 
@@ -523,7 +603,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
 
                 assert prev.status() == ValueStatus.NEW || prev.status() == ValueStatus.RETRY;
 
-                prev.update(val, operation, ValueStatus.NEW);
+                prev.update(newVal.val, newVal.operation(), ValueStatus.NEW);
 
                 break;
             }
@@ -533,14 +613,33 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
         }
 
         // Now check the map size
-        if (writeCache.sizex() > cacheCriticalSize)
+        int cacheSize = getWriteBehindBufferSize();
+
+        if (cacheSize > cacheCriticalSize)
             // Perform single store update in the same thread.
             flushSingleValue();
-        else if (cacheMaxSize > 0 && writeCache.sizex() > cacheMaxSize)
+        else if (cacheMaxSize > 0 && cacheSize > cacheMaxSize)
             wakeUp();
     }
 
     /**
+     * Return flusher by by key.
+     *
+     * @param key Key for search.
+     * @return flusher.
+     */
+    private Flusher flusher(K key) {
+        int h, idx;
+
+        if (flushThreadCntIsPowerOfTwo)
+            idx = ((h = key.hashCode()) ^ (h >>> 16)) & (flushThreadCnt - 1);
+        else
+            idx = ((h = key.hashCode()) ^ (h >>> 16)) % flushThreadCnt;
+
+        return flushThreads[idx];
+    }
+
+    /**
      * Flushes one upcoming value to the underlying store. Called from
      * {@link #updateCache(Object, Entry, StoreOperation)} method in case when current map size exceeds
      * critical size.
@@ -549,7 +648,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
         cacheOverflowCntr.incrementAndGet();
 
         try {
-            Map<K, StatefulValue<K, V>> batch = null;
+            Map<K, StatefulValue<K, V>> batch;
 
             for (Map.Entry<K, StatefulValue<K, V>> e : writeCache.entrySet()) {
                 StatefulValue<K, V> val = e.getValue();
@@ -577,7 +676,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
                 }
 
                 if (!batch.isEmpty()) {
-                    applyBatch(batch, false);
+                    applyBatch(batch, false, null);
 
                     cacheTotalOverflowCntr.incrementAndGet();
 
@@ -595,9 +694,12 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
      *
      * @param valMap Batch map.
      * @param initSes {@code True} if need to initialize session.
+     * @param flusher Flusher, assotiated with all keys in batch (have sense in write coalescing = false mode)
+     * @return {@code True} if batch was successfully applied, {@code False} otherwise.
      */
-    private void applyBatch(Map<K, StatefulValue<K, V>> valMap, boolean initSes) {
+    private boolean applyBatch(Map<K, StatefulValue<K, V>> valMap, boolean initSes, Flusher flusher) {
         assert valMap.size() <= batchSize;
+        assert !valMap.isEmpty();
 
         StoreOperation operation = null;
 
@@ -615,7 +717,9 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
             batch.put(e.getKey(), e.getValue().entry());
         }
 
-        if (updateStore(operation, batch, initSes)) {
+        boolean result = updateStore(operation, batch, initSes, flusher);
+
+        if (result) {
             for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) {
                 StatefulValue<K, V> val = e.getValue();
 
@@ -624,12 +728,22 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
                 try {
                     val.status(ValueStatus.FLUSHED);
 
-                    StatefulValue<K, V> prev = writeCache.remove(e.getKey());
+                    if (writeCoalescing) {
+                        StatefulValue<K, V> prev = writeCache.remove(e.getKey());
 
-                    // Additional check to ensure consistency.
-                    assert prev == val : "Map value for key " + e.getKey() + " was updated during flush";
+                        // Additional check to ensure consistency.
+                        assert prev == val : "Map value for key " + e.getKey() + " was updated during flush";
 
-                    val.signalFlushed();
+                        val.signalFlushed();
+                    }
+                    else {
+                        Flusher f = flusher(e.getKey());
+
+                        // Can remove using equal because if map contains another similar value it has different state.
+                        f.flusherWriteMap.remove(e.getKey(), e.getValue());
+
+                        val.signalFlushed();
+                    }
                 }
                 finally {
                     val.writeLock().unlock();
@@ -653,6 +767,8 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
                 }
             }
         }
+
+        return result;
     }
 
     /**
@@ -666,13 +782,16 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
      * @param operation Status indicating operation that should be performed.
      * @param vals Key-Value map.
      * @param initSes {@code True} if need to initialize session.
+     * @param flusher Flusher, assotiated with vals keys (in writeCoalescing=false mode)
      * @return {@code true} if value may be deleted from the write cache,
      *         {@code false} otherwise
      */
-    private boolean updateStore(StoreOperation operation,
+    private boolean updateStore(
+        StoreOperation operation,
         Map<K, Entry<? extends K, ? extends  V>> vals,
-        boolean initSes) {
-
+        boolean initSes,
+        Flusher flusher
+    ) {
         try {
             if (initSes && storeMgr != null)
                 storeMgr.writeBehindSessionInit();
@@ -707,7 +826,14 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
         catch (Exception e) {
             LT.error(log, e, "Unable to update underlying store: " + store);
 
-            if (writeCache.sizex() > cacheCriticalSize || stopping.get()) {
+            boolean overflow;
+
+            if (writeCoalescing)
+                overflow = writeCache.sizex() > cacheCriticalSize || stopping.get();
+            else
+                overflow = flusher.isOverflowed() || stopping.get();
+
+            if (overflow) {
                 for (Map.Entry<K, Entry<? extends K, ? extends  V>> entry : vals.entrySet()) {
                     Object val = entry.getValue() != null ? entry.getValue().getValue() : null;
 
@@ -738,29 +864,163 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
     }
 
     /**
-     * Thread that performs time-based flushing of written values to the underlying storage.
+     * Thread that performs time/size-based flushing of written values to the underlying storage.
      */
     private class Flusher extends GridWorker {
+        /** Queue to flush. */
+        private final ConcurrentLinkedDeque8<IgniteBiTuple<K, StatefulValue<K,V>>> queue;
+
+        /** Flusher write map. */
+        private final ConcurrentHashMap<K, StatefulValue<K,V>> flusherWriteMap;
+
+        /** Critical size of flusher local queue. */
+        private final int flusherCacheCriticalSize;
+
+        /** Flusher parked flag. */
+        private volatile boolean parked;
+
+        /** Flusher thread. */
+        protected Thread thread;
+
+        /** Cache flushing frequence in nanos. */
+        protected long cacheFlushFreqNanos = cacheFlushFreq * 1000;
+
+        /** Writer lock. */
+        private final Lock flusherWriterLock = new ReentrantLock();
+
+        /** Confition to determine available space for flush. */
+        private Condition flusherWriterCanWrite = flusherWriterLock.newCondition();
+
         /** {@inheritDoc */
-        protected Flusher(String igniteInstanceName, String name, IgniteLogger log) {
+        protected Flusher(String igniteInstanceName,
+            String name,
+            IgniteLogger log) {
             super(igniteInstanceName, name, log);
+
+            flusherCacheCriticalSize = cacheCriticalSize/flushThreadCnt;
+
+            assert flusherCacheCriticalSize > batchSize;
+
+            if (writeCoalescing) {
+                queue = null;
+                flusherWriteMap = null;
+            }
+            else {
+                queue = new ConcurrentLinkedDeque8<>();
+                flusherWriteMap = new ConcurrentHashMap<>(initCap, 0.75f, concurLvl);
+            }
+        }
+
+        /** Start flusher thread */
+        protected void start() {
+            thread = new IgniteThread(this);
+            thread.start();
+        }
+
+        /**
+         * Performs flush-consistent flusher writeCache update for the given key.
+         *
+         * @param key Key for which update is performed.
+         * @param newVal stateful value to put
+         * @throws IgniteInterruptedCheckedException If interrupted while waiting for value to be flushed.
+         */
+        private void putToFlusherWriteCache(
+            K key,
+            StatefulValue<K, V> newVal)
+            throws IgniteInterruptedCheckedException {
+            assert !writeCoalescing : "Unexpected write coalescing.";
+
+            if (queue.sizex() > flusherCacheCriticalSize) {
+                while (queue.sizex() > flusherCacheCriticalSize) {
+                    wakeUp();
+
+                    flusherWriterLock.lock();
+
+                    try {
+                        // Wait for free space in flusher queue
+                        while (queue.sizex() >= flusherCacheCriticalSize && !stopping.get()) {
+                            if (cacheFlushFreq > 0)
+                                flusherWriterCanWrite.await(cacheFlushFreq, TimeUnit.MILLISECONDS);
+                            else
+                                flusherWriterCanWrite.await();
+                        }
+
+                        cacheTotalOverflowCntr.incrementAndGet();
+                    }
+                    catch (InterruptedException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Caught interrupted exception: " + e);
+
+                        Thread.currentThread().interrupt();
+                    }
+                    finally {
+                        flusherWriterLock.unlock();
+                    }
+                }
+
+                cacheTotalOverflowCntr.incrementAndGet();
+            }
+
+            queue.add(F.t(key, newVal));
+
+            flusherWriteMap.put(key, newVal);
+        }
+
+        /**
+         * Get overflowed flag.
+         *
+         * @return {@code True} if write behind flusher is overflowed,
+         *         {@code False} otherwise.
+         */
+        public boolean isOverflowed() {
+            if (writeCoalescing)
+                return writeCache.sizex() > cacheCriticalSize;
+            else
+                return queue.sizex() > flusherCacheCriticalSize;
+        }
+
+        /**
+         * Get write behind flusher size.
+         *
+         * @return Flusher write behind size.
+         */
+        public int size() {
+            return writeCoalescing ? writeCache.sizex() : queue.sizex();
+        }
+
+        /**
+         * Test if write behind flusher is empty
+         *
+         * @return {@code True} if write behind flusher is empty, {@code False} otherwise
+         */
+        public boolean isEmpty() {
+            return writeCoalescing ? writeCache.isEmpty() : queue.isEmpty();
         }
 
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-            while (!stopping.get() || writeCache.sizex() > 0) {
-                awaitOperationsAvailable();
+            if (writeCoalescing) {
+                while (!stopping.get() || writeCache.sizex() > 0) {
+                    awaitOperationsAvailableCoalescing();
 
-                flushCache(writeCache.entrySet().iterator());
+                    flushCacheCoalescing();
+                }
+            }
+            else {
+                while (!stopping.get() || queue.sizex() > 0) {
+                    awaitOperationsAvailableNonCoalescing();
+
+                    flushCacheNonCoalescing();
+                }
             }
         }
 
         /**
-         * This method awaits until enough elements in map are available or given timeout is over.
+         * This method awaits until enough elements in flusher queue are available or given timeout is over.
          *
          * @throws InterruptedException If awaiting was interrupted.
          */
-        private void awaitOperationsAvailable() throws InterruptedException {
+        private void awaitOperationsAvailableCoalescing() throws InterruptedException {
             flushLock.lock();
 
             try {
@@ -780,74 +1040,215 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
         }
 
         /**
+         * This method awaits until enough elements in flusher queue are available or given timeout is over.
+         *
+         * @throws InterruptedException If awaiting was interrupted.
+         */
+        private void awaitOperationsAvailableNonCoalescing() throws InterruptedException {
+            if (queue.sizex() >= batchSize)
+                return;
+
+            parked = true;
+
+            try {
+                for (;;) {
+                    if (queue.sizex() >= batchSize)
+                        return;
+
+                    if (cacheFlushFreq > 0)
+                        LockSupport.parkNanos(cacheFlushFreqNanos);
+                    else
+                        LockSupport.park();
+
+                    if (queue.sizex() > 0)
+                        return;
+
+                    if (Thread.interrupted())
+                        throw new InterruptedException();
+
+                    if (stopping.get())
+                        return;
+                }
+            }
+            finally {
+                parked = false;
+            }
+        }
+
+        /**
+         * Wake up flusher thread.
+         */
+        public void wakeUp() {
+            if (parked)
+                LockSupport.unpark(thread);
+        }
+
+        /**
          * Removes values from the write cache and performs corresponding operation
          * on the underlying store.
-         *
-         * @param it Iterator for write cache.
          */
-        private void flushCache(Iterator<Map.Entry<K,StatefulValue<K, V>>> it) {
-            StoreOperation operation = null;
+        private void flushCacheCoalescing() {
+            StoreOperation prevOperation = null;
 
-            Map<K, StatefulValue<K, V>> batch = null;
-            Map<K, StatefulValue<K, V>> pending  = U.newLinkedHashMap(batchSize);
+            Map<K, StatefulValue<K, V>> pending = U.newLinkedHashMap(batchSize);
+            Iterator<Map.Entry<K, StatefulValue<K, V>>> it = writeCache.entrySet().iterator();
 
             while (it.hasNext()) {
                 Map.Entry<K, StatefulValue<K, V>> e = it.next();
-
                 StatefulValue<K, V> val = e.getValue();
 
-                val.writeLock().lock();
+                if (!val.writeLock().tryLock()) // TODO: stripe write maps to avoid lock contention.
+                    continue;
 
                 try {
-                    ValueStatus status = val.status();
+                    BatchingResult addRes = tryAddStatefulValue(pending, prevOperation, e.getKey(), val);
 
-                    if (acquired(status))
-                        // Another thread is helping us, continue to the next entry.
-                        continue;
-
-                    if (status == ValueStatus.RETRY)
-                        retryEntriesCnt.decrementAndGet();
+                    switch (addRes) {
+                        case NEW_BATCH:
+                            applyBatch(pending, true, null);
 
-                    assert retryEntriesCnt.get() >= 0;
+                            pending = U.newLinkedHashMap(batchSize);
 
-                    val.status(ValueStatus.PENDING);
+                            // No need to test first value in batch
+                            val.status(ValueStatus.PENDING);
+                            pending.put(e.getKey(), val);
+                            prevOperation = val.operation();
 
-                    // We scan for the next operation and apply batch on operation change. Null means new batch.
-                    if (operation == null)
-                        operation = val.operation();
+                            break;
 
-                    if (operation != val.operation()) {
-                        // Operation is changed, so we need to perform a batch.
-                        batch = pending;
-                        pending = U.newLinkedHashMap(batchSize);
+                        case ADDED:
+                            prevOperation = val.operation();
 
-                        operation = val.operation();
+                            break;
 
-                        pending.put(e.getKey(), val);
+                        default:
+                            assert addRes == BatchingResult.SKIPPED : "Unexpected result: " + addRes;
                     }
-                    else
-                        pending.put(e.getKey(), val);
+                }
+                finally {
+                    val.writeLock().unlock();
+                }
+            }
+
+            // Process the remainder.
+            if (!pending.isEmpty())
+                applyBatch(pending, true, null);
+        }
+
+        /**
+         * Removes values from the flusher write queue and performs corresponding operation
+         * on the underlying store.
+         */
+        private void flushCacheNonCoalescing() {
+            StoreOperation prevOperation;
+            Map<K, StatefulValue<K, V>> pending;
+            IgniteBiTuple<K, StatefulValue<K, V>> tuple;
+            boolean applied;
+
+            while(!queue.isEmpty()) {
+                pending = U.newLinkedHashMap(batchSize);
+                prevOperation = null;
+                boolean needNewBatch = false;
+
+                // Collect batch
+                while (!needNewBatch && (tuple = queue.peek()) != null) {
+                    BatchingResult addRes = tryAddStatefulValue(pending, prevOperation, tuple.getKey(),
+                        tuple.getValue());
+
+                    switch (addRes) {
+                        case ADDED:
+                            prevOperation = tuple.getValue().operation();
+                            queue.poll();
+
+                            break;
+
+                        case SKIPPED:
+                            assert false : "Unexpected result: " + addRes;
+
+                            break;
 
-                    if (pending.size() == batchSize) {
-                        batch = pending;
-                        pending = U.newLinkedHashMap(batchSize);
+                        case NEW_BATCH:
+                            needNewBatch = true;
+                            prevOperation = null;
 
-                        operation = null;
+                            break;
+
+                        default:
+                            assert false : "Unexpected result: " + addRes;
                     }
                 }
-                finally {
-                    val.writeLock().unlock();
+
+                // Process collected batch
+                applied = applyBatch(pending, true, this);
+
+                if (applied) {
+                    // Wake up awaiting writers
+                    flusherWriterLock.lock();
+
+                    try {
+                        flusherWriterCanWrite.signalAll();
+                    }
+                    finally {
+                        flusherWriterLock.unlock();
+                    }
                 }
+                else {
+                    // Return values to queue
+                    ArrayList<Map.Entry<K, StatefulValue<K,V>>> pendingList = new ArrayList(pending.entrySet());
 
-                if (batch != null && !batch.isEmpty()) {
-                    applyBatch(batch, true);
-                    batch = null;
+                    for (int i = pendingList.size() - 1; i >= 0; i--)
+                        queue.addFirst(F.t(pendingList.get(i).getKey(), pendingList.get(i).getValue()));
                 }
             }
+        }
 
-            // Process the remainder.
-            if (!pending.isEmpty())
-                applyBatch(pending, true);
+        /**
+         * Trying to add key and statefull value pairs into pending map.
+         *
+         * @param pending Map to populate.
+         * @param key Key to add.
+         * @param val Stateful value to add.
+         * @return {@code BatchingResult.ADDED} if pair was sucessfully added,
+         *     {@code BatchingResult.SKIPPED} if pair cannot be processed by this thread,
+         *     {@code BatchingResult.NEW_BATCH} if pair require new batch (pending map) to be added.
+         */
+        public BatchingResult tryAddStatefulValue(
+            Map<K, StatefulValue<K, V>> pending,
+            StoreOperation prevOperation,
+            K key,
+            StatefulValue<K, V> val
+        ) {
+            ValueStatus status = val.status();
+
+            assert !(pending.isEmpty() && prevOperation != null) : "prev operation cannot be " + prevOperation
+                + " if prev map is empty!";
+
+            if (acquired(status))
+                // Another thread is helping us, continue to the next entry.
+                return BatchingResult.SKIPPED;
+
+            if (!writeCoalescing && pending.containsKey(key))
+                return BatchingResult.NEW_BATCH;
+
+            if (status == ValueStatus.RETRY)
+                retryEntriesCnt.decrementAndGet();
+
+            assert retryEntriesCnt.get() >= 0;
+
+            if (pending.size() == batchSize)
+                return BatchingResult.NEW_BATCH;
+
+            // We scan for the next operation and apply batch on operation change. Null means new batch.
+            if (prevOperation != val.operation() && prevOperation != null)
+                // Operation is changed, so we need to perform a batch.
+                return BatchingResult.NEW_BATCH;
+            else {
+                val.status(ValueStatus.PENDING);
+
+                pending.put(key, val);
+
+                return BatchingResult.ADDED;
+            }
         }
     }
 
@@ -861,6 +1262,20 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
     }
 
     /**
+     * For test purposes only.
+     *
+     * @return Flusher maps for the underlying store operations.
+     */
+    Map<K, StatefulValue<K,V>>[] flusherMaps() {
+        Map<K, StatefulValue<K,V>>[] result = new Map[flushThreadCnt];
+
+        for (int i=0; i < flushThreadCnt; i++)
+            result[i] = flushThreads[i].flusherWriteMap;
+
+        return result;
+    }
+
+    /**
      * Enumeration that represents possible operations on the underlying store.
      */
     private enum StoreOperation {
@@ -889,6 +1304,20 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
     }
 
     /**
+     * Enumeration that represents possible result of "add to batch" operation.
+     */
+    private enum BatchingResult {
+        /** Added to batch */
+        ADDED,
+
+        /** Skipped. */
+        SKIPPED,
+
+        /** Need new batch. */
+        NEW_BATCH
+    }
+
+    /**
      * Checks if given status indicates pending or complete flush operation.
      *
      * @param status Status to check.
@@ -901,6 +1330,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
     /**
      * A state-value-operation trio.
      *
+     * @param <K> Key type.
      * @param <V> Value type.
      */
     private static class StatefulValue<K, V> extends ReentrantReadWriteLock {
@@ -949,7 +1379,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
         }
 
         /**
-         * @return Value status
+         * @return Value status.
          */
         private ValueStatus status() {
             return valStatus;
@@ -980,7 +1410,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
         }
 
         /**
-         * Awaits a signal on flush condition
+         * Awaits a signal on flush condition.
          *
          * @throws IgniteInterruptedCheckedException If thread was interrupted.
          */
@@ -1023,4 +1453,4 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
             return S.toString(StatefulValue.class, this);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java
index 323278f..3bac906 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java
@@ -20,12 +20,14 @@ package org.apache.ignite.internal.processors.cache.store;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheTestStore;
@@ -59,16 +61,29 @@ public abstract class GridCacheWriteBehindStoreAbstractSelfTest extends GridComm
     /**
      * Initializes store.
      *
-     * @param flushThreadCnt Count of flush threads
+     * @param flushThreadCnt Count of flush threads.
      * @throws Exception If failed.
      */
     protected void initStore(int flushThreadCnt) throws Exception {
+        initStore(flushThreadCnt, CacheConfiguration.DFLT_WRITE_BEHIND_COALESCING);
+    }
+
+    /**
+     * Initializes store.
+     *
+     * @param flushThreadCnt Count of flush threads.
+     * @param writeCoalescing write coalescing flag.
+     * @throws Exception If failed.
+     */
+    protected void initStore(int flushThreadCnt, boolean writeCoalescing) throws Exception {
         store = new GridCacheWriteBehindStore<>(null, "", "", log, delegate);
 
         store.setFlushFrequency(FLUSH_FREQUENCY);
 
         store.setFlushSize(CACHE_SIZE);
 
+        store.setWriteCoalescing(writeCoalescing);
+
         store.setFlushThreadCount(flushThreadCnt);
 
         delegate.reset();
@@ -83,8 +98,11 @@ public abstract class GridCacheWriteBehindStoreAbstractSelfTest extends GridComm
      */
     protected void shutdownStore() throws Exception {
         store.stop();
-
-        assertTrue("Store cache must be empty after shutdown", store.writeCache().isEmpty());
+        if (store.getWriteCoalescing())
+            assertTrue("Store cache must be empty after shutdown", store.writeCache().isEmpty());
+        else
+            for (Map<?,?> fMap : store.flusherMaps())
+                assertTrue("Store flusher cache must be empty after shutdown", fMap.isEmpty());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
index ffdad5c..56ee760 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
@@ -37,6 +37,10 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.jetbrains.annotations.Nullable;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.runners.Parameterized.Parameter;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;

http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java
index bc6b7bd..15c58d9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
@@ -28,12 +29,30 @@ import org.apache.ignite.internal.util.typedef.internal.U;
  */
 public class GridCacheWriteBehindStoreMultithreadedSelfTest extends GridCacheWriteBehindStoreAbstractSelfTest {
     /**
+     * This test performs complex set of operations on store with coalescing from multiple threads.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPutGetRemoveWithCoalescing() throws Exception {
+        testPutGetRemove(true);
+    }
+
+    /**
+     * This test performs complex set of operations on store without coalescing from multiple threads.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPutGetRemoveWithoutCoalescing() throws Exception {
+        testPutGetRemove(false);
+    }
+
+    /**
      * This test performs complex set of operations on store from multiple threads.
      *
      * @throws Exception If failed.
      */
-    public void testPutGetRemove() throws Exception {
-        initStore(2);
+    private void testPutGetRemove(boolean writeCoalescing) throws Exception {
+        initStore(2, writeCoalescing);
 
         Set<Integer> exp;
 
@@ -63,26 +82,54 @@ public class GridCacheWriteBehindStoreMultithreadedSelfTest extends GridCacheWri
     }
 
     /**
+     * Tests that cache with write coalescing would keep values if underlying store fails.
+     *
+     * @throws Exception if failed.
+     */
+    public void testStoreFailureWithCoalescing() throws Exception {
+        testStoreFailure(true);
+    }
+
+    /**
+     * Tests that cache without write coalescing would keep values if underlying store fails.
+     *
+     * @throws Exception if failed.
+     */
+    public void testStoreFailureWithoutCoalescing() throws Exception {
+        testStoreFailure(false);
+    }
+
+    /**
      * Tests that cache would keep values if underlying store fails.
      *
      * @throws Exception If failed.
      */
-    public void testStoreFailure() throws Exception {
+    private void testStoreFailure(boolean writeCoalescing) throws Exception {
         delegate.setShouldFail(true);
 
-        initStore(2);
+        initStore(2, writeCoalescing);
 
         Set<Integer> exp;
 
         try {
+            Thread timer = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        U.sleep(FLUSH_FREQUENCY+50);
+                    } catch (IgniteInterruptedCheckedException e) {
+                        assertTrue("Timer was interrupted", false);
+                    }
+                    delegate.setShouldFail(false);
+                }
+            });
+            timer.start();
             exp = runPutGetRemoveMultithreaded(10, 10);
 
-            U.sleep(FLUSH_FREQUENCY);
+            timer.join();
 
             info(">>> There are " + store.getWriteBehindErrorRetryCount() + " entries in RETRY state");
 
-            delegate.setShouldFail(false);
-
             // Despite that we set shouldFail flag to false, flush thread may just have caught an exception.
             // If we move store to the stopping state right away, this value will be lost. That's why this sleep
             // is inserted here to let all exception handlers in write-behind store exit.
@@ -111,16 +158,37 @@ public class GridCacheWriteBehindStoreMultithreadedSelfTest extends GridCacheWri
     }
 
     /**
+     * Tests store (with write coalescing) consistency in case of high put rate,
+     * when flush is performed from the same thread as put or remove operation.
+     *
+     * @throws Exception If failed.
+     */
+    public void testFlushFromTheSameThreadWithCoalescing() throws Exception {
+        testFlushFromTheSameThread(true);
+    }
+
+    /**
+     * Tests store (without write coalescing) consistency in case of high put rate,
+     * when flush is performed from the same thread as put or remove operation.
+     *
+     * @throws Exception If failed.
+     */
+    public void testFlushFromTheSameThreadWithoutCoalescing() throws Exception {
+        testFlushFromTheSameThread(false);
+    }
+
+    /**
      * Tests store consistency in case of high put rate, when flush is performed from the same thread
      * as put or remove operation.
      *
+     * @param writeCoalescing write coalescing flag.
      * @throws Exception If failed.
      */
-    public void testFlushFromTheSameThread() throws Exception {
+    private void testFlushFromTheSameThread(boolean writeCoalescing) throws Exception {
         // 50 milliseconds should be enough.
         delegate.setOperationDelay(50);
 
-        initStore(2);
+        initStore(2, writeCoalescing);
 
         Set<Integer> exp;
 
@@ -162,4 +230,4 @@ public class GridCacheWriteBehindStoreMultithreadedSelfTest extends GridCacheWri
         for (Integer key : exp)
             assertEquals("Invalid value for key " + key, "val" + key, map.get(key));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
index 67e26ab..9a487a4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
@@ -35,11 +35,30 @@ import org.jsr166.ConcurrentLinkedHashMap;
  */
 public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStoreAbstractSelfTest {
     /**
-     * Tests correct store shutdown when underlying store fails,
+     * Tests correct store (with write coalescing) shutdown when underlying store fails.
      *
      * @throws Exception If failed.
      */
-    public void testShutdownWithFailure() throws Exception {
+    public void testShutdownWithFailureWithCoalescing() throws Exception {
+        testShutdownWithFailure(true);
+    }
+
+    /**
+     * Tests correct store (without write coalescing) shutdown when underlying store fails.
+     *
+     * @throws Exception If failed.
+     */
+    public void testShutdownWithFailureWithoutCoalescing() throws Exception {
+        testShutdownWithFailure(false);
+    }
+
+    /**
+     * Tests correct store shutdown when underlying store fails.
+     *
+     * @param writeCoalescing Write coalescing flag.
+     * @throws Exception If failed.
+     */
+    private void testShutdownWithFailure(final boolean writeCoalescing) throws Exception {
         final AtomicReference<Exception> err = new AtomicReference<>();
 
         multithreadedAsync(new Runnable() {
@@ -47,7 +66,7 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore
                 try {
                     delegate.setShouldFail(true);
 
-                    initStore(2);
+                    initStore(2, writeCoalescing);
 
                     try {
                         store.write(new CacheEntryImpl<>(1, "val1"));
@@ -70,10 +89,31 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore
     }
 
     /**
+     * Simple store (with write coalescing) test.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSimpleStoreWithCoalescing() throws Exception {
+        testSimpleStore(true);
+    }
+
+    /**
+     * Simple store (without write coalescing) test.
+     *
      * @throws Exception If failed.
      */
-    public void testSimpleStore() throws Exception {
-        initStore(2);
+    public void testSimpleStoreWithoutCoalescing() throws Exception {
+        testSimpleStore(false);
+    }
+
+    /**
+     * Simple store test.
+     *
+     * @param writeCoalescing Write coalescing flag.
+     * @throws Exception If failed.
+     */
+    private void testSimpleStore(boolean writeCoalescing) throws Exception {
+        initStore(2, writeCoalescing);
 
         try {
             store.write(new CacheEntryImpl<>(1, "v1"));
@@ -95,14 +135,35 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore
     }
 
     /**
+     * Check that all values written to the store with coalescing will be in underlying store after timeout
+     * or due to size limits.
+     *
+     * @throws Exception If failed.
+     */
+    public void testValuePropagationWithCoalescing() throws Exception {
+        testValuePropagation(true);
+    }
+
+    /**
+     * Check that all values written to the store without coalescing will be in underlying store after timeout
+     * or due to size limits.
+     *
+     * @throws Exception If failed.
+     */
+    public void testValuePropagationWithoutCoalescing() throws Exception {
+        testValuePropagation(false);
+    }
+
+    /**
      * Check that all values written to the store will be in underlying store after timeout or due to size limits.
      *
+     * @param writeCoalescing Write coalescing flag
      * @throws Exception If failed.
      */
     @SuppressWarnings({"NullableProblems"})
-    public void testValuePropagation() throws Exception {
+    private void testValuePropagation(boolean writeCoalescing) throws Exception {
         // Need to test size-based write.
-        initStore(1);
+        initStore(1, writeCoalescing);
 
         try {
             for (int i = 0; i < CACHE_SIZE * 2; i++)
@@ -132,12 +193,31 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore
     }
 
     /**
+     * Tests store with write coalescing behaviour under continuous put of the same key with different values.
+     *
+     * @throws Exception If failed.
+     */
+    public void testContinuousPutWithCoalescing() throws Exception {
+        testContinuousPut(true);
+    }
+
+    /**
+     * Tests store without write coalescing behaviour under continuous put of the same key with different values.
+     *
+     * @throws Exception If failed.
+     */
+    public void testContinuousPutWithoutCoalescing() throws Exception {
+        testContinuousPut(false);
+    }
+
+    /**
      * Tests store behaviour under continuous put of the same key with different values.
      *
-     * @throws Exception If failed
+     * @param writeCoalescing Write coalescing flag for cache.
+     * @throws Exception If failed.
      */
-    public void testContinuousPut() throws Exception {
-        initStore(2);
+    private void testContinuousPut(boolean writeCoalescing) throws Exception {
+        initStore(2, writeCoalescing);
 
         try {
             final AtomicBoolean running = new AtomicBoolean(true);
@@ -169,17 +249,22 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore
             }, 1, "put");
 
             U.sleep(FLUSH_FREQUENCY * 2 + 500);
+            running.set(false);
+            U.sleep(FLUSH_FREQUENCY * 2 + 500);
 
             int delegatePutCnt = delegate.getPutAllCount();
 
-            running.set(false);
 
             fut.get();
 
             log().info(">>> [putCnt = " + actualPutCnt.get() + ", delegatePutCnt=" + delegatePutCnt + "]");
 
             assertTrue("No puts were made to the underlying store", delegatePutCnt > 0);
-            assertTrue("Too many puts were made to the underlying store", delegatePutCnt < actualPutCnt.get() / 10);
+            if (store.getWriteCoalescing()) {
+                assertTrue("Too many puts were made to the underlying store", delegatePutCnt < actualPutCnt.get() / 10);
+            } else {
+                assertTrue("Too few puts cnt=" + actualPutCnt.get() + " << storePutCnt=" + delegatePutCnt, delegatePutCnt > actualPutCnt.get() / 2);
+            }
         }
         finally {
             shutdownStore();
@@ -193,13 +278,34 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore
     }
 
     /**
+     * Tests that all values were put into the store with write coalescing will be written to the underlying store
+     * after shutdown is called.
+     *
+     * @throws Exception If failed.
+     */
+    public void testShutdownWithCoalescing() throws Exception {
+        testShutdown(true);
+    }
+
+    /**
+     * Tests that all values were put into the store without write coalescing will be written to the underlying store
+     * after shutdown is called.
+     *
+     * @throws Exception If failed.
+     */
+    public void testShutdownWithoutCoalescing() throws Exception {
+        testShutdown(false);
+    }
+
+    /**
      * Tests that all values were put into the store will be written to the underlying store
      * after shutdown is called.
      *
+     * @param writeCoalescing Write coalescing flag.
      * @throws Exception If failed.
      */
-    public void testShutdown() throws Exception {
-        initStore(2);
+    private void testShutdown(boolean writeCoalescing) throws Exception {
+        initStore(2, writeCoalescing);
 
         try {
             final AtomicBoolean running = new AtomicBoolean(true);
@@ -243,14 +349,35 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore
 
     /**
      * Tests that all values will be written to the underlying store
+     * right in the same order as they were put into the store with coalescing.
+     *
+     * @throws Exception If failed.
+     */
+    public void testBatchApplyWithCoalescing() throws Exception {
+        testBatchApply(true);
+    }
+
+    /**
+     * Tests that all values will be written to the underlying store
+     * right in the same order as they were put into the store without coalescing.
+     *
+     * @throws Exception If failed.
+     */
+    public void testBatchApplyWithoutCoalescing() throws Exception {
+        testBatchApply(false);
+    }
+
+    /**
+     * Tests that all values will be written to the underlying store
      * right in the same order as they were put into the store.
      *
+     * @param writeCoalescing Write coalescing flag.
      * @throws Exception If failed.
      */
-    public void testBatchApply() throws Exception {
+    private void testBatchApply(boolean writeCoalescing) throws Exception {
         delegate = new GridCacheTestStore(new ConcurrentLinkedHashMap<Integer, String>());
 
-        initStore(1);
+        initStore(1, writeCoalescing);
 
         List<Integer> intList = new ArrayList<>(CACHE_SIZE);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/IgnteCacheClientWriteBehindStoreNonCoalescingTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/IgnteCacheClientWriteBehindStoreNonCoalescingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/IgnteCacheClientWriteBehindStoreNonCoalescingTest.java
new file mode 100644
index 0000000..8ea109d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/IgnteCacheClientWriteBehindStoreNonCoalescingTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteFuture;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+
+/**
+ * This class provides non write coalescing tests for {@link org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore}.
+ */
+public class IgnteCacheClientWriteBehindStoreNonCoalescingTest extends IgniteCacheAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicWriteOrderMode atomicWriteOrderMode() {
+        return CLOCK;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected NearCacheConfiguration nearConfiguration() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Factory<CacheStore> cacheStoreFactory() {
+        return new TestIncrementStoreFactory();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonCoalescingIncrementing() throws Exception {
+        Ignite ignite = grid(0);
+
+        IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+        assertEquals(cache.getConfiguration(CacheConfiguration.class).getCacheStoreFactory().getClass(),
+            TestIncrementStoreFactory.class);
+
+        Set<Integer> keys = new HashSet<>();
+
+        for (int i = 0; i < 1000; i++) {
+            keys.add(i);
+
+            cache.put(i, i);
+        }
+
+        Collection<IgniteFuture<?>> futs = new ArrayList<>();
+
+        for (int i = 0; i < 100; i++)
+            futs.add(updateKeys(cache, keys));
+
+        for (IgniteFuture<?> fut : futs)
+            fut.get();
+    }
+
+    /**
+     * Update specified keys in async mode.
+     *
+     * @param cache Cache to use.
+     * @param keys Keys to update.
+     * @return IgniteFuture.
+     */
+    private IgniteFuture<?>  updateKeys(IgniteCache<Integer, Integer> cache, Set<Integer> keys) {
+        IgniteCache asyncCache = cache.withAsync();
+
+        // Using EntryProcessor.invokeAll to increment every value in place.
+        asyncCache.invokeAll(keys, new EntryProcessor<Integer, Integer, Object>() {
+            @Override public Object process(MutableEntry<Integer, Integer> entry, Object... arguments)
+                throws EntryProcessorException {
+                entry.setValue(entry.getValue() + 1);
+
+                return null;
+            }
+        });
+
+        return asyncCache.future();
+    }
+
+    /**
+     * Test increment store factory.
+     */
+    public static class TestIncrementStoreFactory implements Factory<CacheStore> {
+        /** {@inheritDoc} */
+        @Override public CacheStore create() {
+            return new TestIncrementStore();
+        }
+    }
+
+    /**
+     * Test cache store to validate int value incrementing
+     */
+    public static class TestIncrementStore extends CacheStoreAdapter<Object, Object> {
+        /** {@inheritDoc} */
+        @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) {
+            for (Map.Entry<Object, Object> e : storeMap.entrySet())
+                clo.apply(e.getKey(), e.getValue());
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object load(Object key) {
+            return storeMap.get(key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<? extends Object, ? extends Object> entry) {
+            Object oldValue = storeMap.put(entry.getKey(), entry.getValue());
+
+            if (oldValue instanceof Integer && entry.getValue() instanceof Integer) {
+                Integer oldInt = (Integer)oldValue;
+                Integer newInt = (Integer)entry.getValue();
+
+                assertTrue(
+                    "newValue(" + newInt + ") != oldValue(" + oldInt + ")+1 !",
+                    newInt == oldInt + 1);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) {
+            storeMap.remove(key);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java
index b4cdfa8..dff93ff 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindSto
 import org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStoreReplicatedTest;
 import org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStoreSelfTest;
 import org.apache.ignite.internal.processors.cache.store.IgnteCacheClientWriteBehindStoreAtomicTest;
+import org.apache.ignite.internal.processors.cache.store.IgnteCacheClientWriteBehindStoreNonCoalescingTest;
 import org.apache.ignite.internal.processors.cache.store.IgnteCacheClientWriteBehindStoreTxTest;
 
 /**
@@ -49,6 +50,7 @@ public class IgniteCacheWriteBehindTestSuite extends TestSuite {
         suite.addTest(new TestSuite(GridCachePartitionedWritesTest.class));
         suite.addTest(new TestSuite(IgnteCacheClientWriteBehindStoreAtomicTest.class));
         suite.addTest(new TestSuite(IgnteCacheClientWriteBehindStoreTxTest.class));
+        suite.addTest(new TestSuite(IgnteCacheClientWriteBehindStoreNonCoalescingTest.class));
 
         return suite;
     }


Mime
View raw message