ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [24/30] incubator-ignite git commit: # GG-9973: Fixed.
Date Thu, 02 Apr 2015 19:23:07 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
new file mode 100644
index 0000000..5ce42f9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
@@ -0,0 +1,1015 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.lifecycle.*;
+import org.apache.ignite.thread.*;
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
+
+import javax.cache.integration.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+
+import static javax.cache.Cache.*;
+
+/**
+ * Internal wrapper for a {@link CacheStore} that enables write-behind logic.
+ * <p/>
+ * The general purpose of this approach is to reduce cache store load under high
+ * store update rate. The idea is to cache all write and remove operations in a pending
+ * map and delegate these changes to the underlying store either after timeout or
+ * if size of a pending map exceeded some pre-configured value. Another performance gain
+ * is achieved due to combining a group of similar operations to a single batch update.
+ * <p/>
+ * The essential flush size for the write-behind cache should be at least the estimated
+ * count of simultaneously written keys. In case of significantly smaller value there would
+ * be triggered a lot of flush events that will result in a high cache store load.
+ * <p/>
+ * Since write operations to the cache store are deferred, transaction support is lost; no
+ * transaction objects are passed to the underlying store.
+ */
+public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, LifecycleAware {
+    /** Default write cache initial capacity. */
+    public static final int DFLT_INITIAL_CAPACITY = 1024;
+
+    /** Overflow ratio for critical cache size calculation. */
+    public static final float CACHE_OVERFLOW_RATIO = 1.5f;
+
+    /** Default concurrency level of write cache. */
+    public static final int DFLT_CONCUR_LVL = 64;
+
+    /** Write cache initial capacity. */
+    private int initCap = DFLT_INITIAL_CAPACITY;
+
+    /** Concurrency level for write cache access. */
+    private int concurLvl = DFLT_CONCUR_LVL;
+
+    /** When cache size exceeds this value eldest entry will be stored to the underlying store. */
+    private int cacheMaxSize = CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_SIZE;
+
+    /** Critical cache size. If cache size exceeds this value, data flush performed synchronously. */
+    private int cacheCriticalSize;
+
+    /** Count of worker threads performing underlying store updates. */
+    private int flushThreadCnt = CacheConfiguration.DFLT_WRITE_FROM_BEHIND_FLUSH_THREAD_CNT;
+
+    /** Cache flush frequency. All pending operations will be performed in not less then this value ms. */
+    private long cacheFlushFreq = CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_FREQUENCY;
+
+    /** Maximum batch size for put and remove operations */
+    private int batchSize = CacheConfiguration.DFLT_WRITE_BEHIND_BATCH_SIZE;
+
+    /** Grid name. */
+    private String gridName;
+
+    /** Cache name. */
+    private String cacheName;
+
+    /** Underlying store. */
+    private CacheStore<K, V> store;
+
+    /** Write cache. */
+    private ConcurrentLinkedHashMap<K, StatefulValue<K, V>> writeCache;
+
+    /** Flusher threads. */
+    private GridWorker[] flushThreads;
+
+    /** Atomic flag indicating store shutdown. */
+    private AtomicBoolean stopping = new AtomicBoolean(true);
+
+    /** Flush lock. */
+    private Lock flushLock = new ReentrantLock();
+
+    /** Condition to determine records available for flush. */
+    private Condition canFlush = flushLock.newCondition();
+
+    /** Variable for counting total cache overflows. */
+    private AtomicInteger cacheTotalOverflowCntr = new AtomicInteger();
+
+    /** Variable contains current number of overflow events. */
+    private AtomicInteger cacheOverflowCntr = new AtomicInteger();
+
+    /** Variable for counting key-value pairs that are in {@link ValueStatus#RETRY} state. */
+    private AtomicInteger retryEntriesCnt = new AtomicInteger();
+
+    /** Log. */
+    private IgniteLogger log;
+
+    /** Store manager. */
+    private CacheStoreManager storeMgr;
+
+    /**
+     * Creates a write-behind cache store for the given store.
+     *
+     * @param storeMgr Store manager.
+     * @param gridName Grid name.
+     * @param cacheName Cache name.
+     * @param log Grid logger.
+     * @param store {@code GridCacheStore} that need to be wrapped.
+     */
+    public GridCacheWriteBehindStore(
+        CacheStoreManager storeMgr,
+        String gridName,
+        String cacheName,
+        IgniteLogger log,
+        CacheStore<K, V> store) {
+        this.storeMgr = storeMgr;
+        this.gridName = gridName;
+        this.cacheName = cacheName;
+        this.log = log;
+        this.store = store;
+    }
+
+    /**
+     * Sets initial capacity for the write cache.
+     *
+     * @param initCap Initial capacity.
+     */
+    public void setInitialCapacity(int initCap) {
+        this.initCap = initCap;
+    }
+
+    /**
+     * Sets concurrency level for the write cache. Concurrency level is expected count of concurrent threads
+     * attempting to update cache.
+     *
+     * @param concurLvl Concurrency level.
+     */
+    public void setConcurrencyLevel(int concurLvl) {
+        this.concurLvl = concurLvl;
+    }
+
+    /**
+     * Sets the maximum size of the write cache. When the count of unique keys in write cache exceeds this value,
+     * the eldest entry in the cache is immediately scheduled for write to the underlying store.
+     *
+     * @param cacheMaxSize Max cache size.
+     */
+    public void setFlushSize(int cacheMaxSize) {
+        this.cacheMaxSize = cacheMaxSize;
+    }
+
+    /**
+     * Gets the maximum size of the write-behind buffer. When the count of unique keys
+     * in write buffer exceeds this value, the buffer is scheduled for write to the underlying store.
+     * <p/>
+     * If this value is {@code 0}, then flush is performed only on time-elapsing basis. However,
+     * when this value is {@code 0}, the cache critical size is set to
+     * {@link CacheConfiguration#DFLT_WRITE_BEHIND_CRITICAL_SIZE}
+     *
+     * @return Buffer size that triggers flush procedure.
+     */
+    public int getWriteBehindFlushSize() {
+        return cacheMaxSize;
+    }
+
+    /**
+     * Sets the number of threads that will perform store update operations.
+     *
+     * @param flushThreadCnt Count of worker threads.
+     */
+    public void setFlushThreadCount(int flushThreadCnt) {
+        this.flushThreadCnt = flushThreadCnt;
+    }
+
+    /**
+     * Gets the number of flush threads that will perform store update operations.
+     *
+     * @return Count of worker threads.
+     */
+    public int getWriteBehindFlushThreadCount() {
+        return flushThreadCnt;
+    }
+
+    /**
+     * Sets the cache flush frequency. All pending operations on the underlying store will be performed
+     * within time interval not less then this value.
+     *
+     * @param cacheFlushFreq Time interval value in milliseconds.
+     */
+    public void setFlushFrequency(long cacheFlushFreq) {
+        this.cacheFlushFreq = cacheFlushFreq;
+    }
+
+    /**
+     * Gets the cache flush frequency. All pending operations on the underlying store will be performed
+     * within time interval not less then this value.
+     * <p/>
+     * If this value is {@code 0}, then flush is performed only when buffer size exceeds flush size.
+     *
+     * @return Flush frequency in milliseconds.
+     */
+    public long getWriteBehindFlushFrequency() {
+        return cacheFlushFreq;
+    }
+
+    /**
+     * Sets the maximum count of similar operations that can be grouped to a single batch.
+     *
+     * @param batchSize Maximum count of batch.
+     */
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    /**
+     * Gets the maximum count of similar (put or remove) operations that can be grouped to a single batch.
+     *
+     * @return Maximum size of batch.
+     */
+    public int getWriteBehindStoreBatchSize() {
+        return batchSize;
+    }
+
+    /**
+     * Gets count of entries that were processed by the write-behind store and have not been
+     * flushed to the underlying store yet.
+     *
+     * @return Total count of entries in cache store internal buffer.
+     */
+    public int getWriteBehindBufferSize() {
+        return writeCache.sizex();
+    }
+
+    /**
+     * @return Underlying store.
+     */
+    public CacheStore<K, V> store() {
+        return store;
+    }
+
+    /**
+     * Performs all the initialization logic for write-behind cache store.
+     * This class must not be used until this method returns.
+     */
+    @Override public void start() {
+        assert cacheFlushFreq != 0 || cacheMaxSize != 0;
+
+        if (stopping.compareAndSet(true, false)) {
+            if (log.isDebugEnabled())
+                log.debug("Starting write-behind store for cache '" + cacheName + '\'');
+
+            cacheCriticalSize = (int)(cacheMaxSize * CACHE_OVERFLOW_RATIO);
+
+            if (cacheCriticalSize == 0)
+                cacheCriticalSize = CacheConfiguration.DFLT_WRITE_BEHIND_CRITICAL_SIZE;
+
+            flushThreads = new GridWorker[flushThreadCnt];
+
+            writeCache = new ConcurrentLinkedHashMap<>(initCap, 0.75f, concurLvl);
+
+            for (int i = 0; i < flushThreads.length; i++) {
+                flushThreads[i] = new Flusher(gridName, "flusher-" + i, log);
+
+                new IgniteThread(flushThreads[i]).start();
+            }
+        }
+    }
+
+    /**
+     * Gets count of write buffer overflow events since initialization. Each overflow event causes
+     * the ongoing flush operation to be performed synchronously.
+     *
+     * @return Count of cache overflow events since start.
+     */
+    public int getWriteBehindTotalCriticalOverflowCount() {
+        return cacheTotalOverflowCntr.get();
+    }
+
+    /**
+     * Gets count of write buffer overflow events in progress at the moment. Each overflow event causes
+     * the ongoing flush operation to be performed synchronously.
+     *
+     * @return Count of cache overflow events since start.
+     */
+    public int getWriteBehindCriticalOverflowCount() {
+        return cacheOverflowCntr.get();
+    }
+
+    /**
+     * Gets count of cache entries that are in a store-retry state. An entry is assigned a store-retry state
+     * when underlying store failed due some reason and cache has enough space to retain this entry till
+     * the next try.
+     *
+     * @return Count of entries in store-retry state.
+     */
+    public int getWriteBehindErrorRetryCount() {
+        return retryEntriesCnt.get();
+    }
+
+    /**
+     * Performs shutdown logic for store. No put, get and remove requests will be processed after
+     * this method is called.
+     */
+    @Override public void stop() {
+        if (stopping.compareAndSet(false, true)) {
+            if (log.isDebugEnabled())
+                log.debug("Stopping write-behind store for cache '" + cacheName + '\'');
+
+            wakeUp();
+
+            boolean graceful = true;
+
+            for (GridWorker worker : flushThreads)
+                graceful &= U.join(worker, log);
+
+            if (!graceful)
+                log.warning("Shutdown was aborted");
+        }
+    }
+
+    /**
+     * Forces all entries collected to be flushed to the underlying store.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void forceFlush() throws IgniteCheckedException {
+        wakeUp();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) {
+        store.loadCache(clo, args);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<K, V> loadAll(Iterable<? extends K> keys) {
+        if (log.isDebugEnabled())
+            log.debug("Store load all [keys=" + keys + ']');
+
+        Map<K, V> loaded = new HashMap<>();
+
+        Collection<K> remaining = new LinkedList<>();
+
+        for (K key : keys) {
+            StatefulValue<K, V> val = writeCache.get(key);
+
+            if (val != null) {
+                val.readLock().lock();
+
+                try {
+                    if (val.operation() == StoreOperation.PUT)
+                        loaded.put(key, val.entry().getValue());
+                    else
+                        assert val.operation() == StoreOperation.RMV : val.operation();
+                }
+                finally {
+                    val.readLock().unlock();
+                }
+            }
+            else
+                remaining.add(key);
+        }
+
+        // For items that were not found in queue.
+        if (!remaining.isEmpty()) {
+            Map<K, V> loaded0 = store.loadAll(remaining);
+
+            if (loaded0 != null)
+                loaded.putAll(loaded0);
+        }
+
+        return loaded;
+    }
+
+    /** {@inheritDoc} */
+    @Override public V load(K key) {
+        if (log.isDebugEnabled())
+            log.debug("Store load [key=" + key + ']');
+
+        StatefulValue<K, V> val = writeCache.get(key);
+
+        if (val != null) {
+            val.readLock().lock();
+
+            try {
+                switch (val.operation()) {
+                    case PUT:
+                        return val.entry().getValue();
+
+                    case RMV:
+                        return null;
+
+                    default:
+                        assert false : "Unexpected operation: " + val.status();
+                }
+            }
+            finally {
+                val.readLock().unlock();
+            }
+        }
+
+        return store.load(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeAll(Collection<Entry<? extends K, ? extends V>> entries) {
+        for (Entry<? extends K, ? extends V> e : entries)
+            write(e);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(Entry<? extends K, ? extends V> entry) {
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Store put [key=" + entry.getKey() + ", val=" + entry.getValue() + ']');
+
+            updateCache(entry.getKey(), entry, StoreOperation.PUT);
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            throw new CacheWriterException(U.convertExceptionNoWrap(e));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void deleteAll(Collection<?> keys) {
+        for (Object key : keys)
+            delete(key);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void delete(Object key) {
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Store remove [key=" + key + ']');
+
+            updateCache((K)key, null, StoreOperation.RMV);
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            throw new CacheWriterException(U.convertExceptionNoWrap(e));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void sessionEnd(boolean commit) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheWriteBehindStore.class, this);
+    }
+
+    /**
+     * Performs flush-consistent cache update for the given key.
+     *
+     * @param key Key for which update is performed.
+     * @param val New value, may be null for remove operation.
+     * @param operation Updated value status
+     * @throws IgniteInterruptedCheckedException If interrupted while waiting for value to be flushed.
+     */
+    private void updateCache(K key,
+        @Nullable Entry<? extends K, ? extends V> val,
+        StoreOperation operation)
+        throws IgniteInterruptedCheckedException {
+        StatefulValue<K, V> newVal = new StatefulValue<>(val, operation);
+
+        StatefulValue<K, V> prev;
+
+        while ((prev = writeCache.putIfAbsent(key, newVal)) != null) {
+            prev.writeLock().lock();
+
+            try {
+                if (prev.status() == ValueStatus.PENDING) {
+                    // Flush process in progress, try again.
+                    prev.waitForFlush();
+
+                    continue;
+                }
+                else if (prev.status() == ValueStatus.FLUSHED)
+                    // This entry was deleted from map before we acquired the lock.
+                    continue;
+                else if (prev.status() == ValueStatus.RETRY)
+                    // New value has come, old value is no longer in RETRY state,
+                    retryEntriesCnt.decrementAndGet();
+
+                assert prev.status() == ValueStatus.NEW || prev.status() == ValueStatus.RETRY;
+
+                prev.update(val, operation, ValueStatus.NEW);
+
+                break;
+            }
+            finally {
+                prev.writeLock().unlock();
+            }
+        }
+
+        // Now check the map size
+        if (writeCache.sizex() > cacheCriticalSize)
+            // Perform single store update in the same thread.
+            flushSingleValue();
+        else if (cacheMaxSize > 0 && writeCache.sizex() > cacheMaxSize)
+            wakeUp();
+    }
+
+    /**
+     * Flushes one upcoming value to the underlying store. Called from
+     * {@link #updateCache(Object, Entry, StoreOperation)} method in case when current map size exceeds
+     * critical size.
+     */
+    private void flushSingleValue() {
+        cacheOverflowCntr.incrementAndGet();
+
+        try {
+            Map<K, StatefulValue<K, V>> batch = null;
+
+            for (Map.Entry<K, StatefulValue<K, V>> e : writeCache.entrySet()) {
+                StatefulValue<K, V> val = e.getValue();
+
+                val.writeLock().lock();
+
+                try {
+                    ValueStatus status = val.status();
+
+                    if (acquired(status))
+                        // Another thread is helping us, continue to the next entry.
+                        continue;
+
+                    if (val.status() == ValueStatus.RETRY)
+                        retryEntriesCnt.decrementAndGet();
+
+                    assert retryEntriesCnt.get() >= 0;
+
+                    val.status(ValueStatus.PENDING);
+
+                    batch = Collections.singletonMap(e.getKey(), val);
+                }
+                finally {
+                    val.writeLock().unlock();
+                }
+
+                if (!batch.isEmpty()) {
+                    applyBatch(batch, false);
+
+                    cacheTotalOverflowCntr.incrementAndGet();
+
+                    return;
+                }
+            }
+        }
+        finally {
+            cacheOverflowCntr.decrementAndGet();
+        }
+    }
+
+    /**
+     * Performs batch operation on underlying store.
+     *
+     * @param valMap Batch map.
+     * @param initSes {@code True} if need to initialize session.
+     */
+    private void applyBatch(Map<K, StatefulValue<K, V>> valMap, boolean initSes) {
+        assert valMap.size() <= batchSize;
+
+        StoreOperation operation = null;
+
+        // Construct a map for underlying store
+        Map<K, Entry<? extends K, ? extends V>> batch = U.newLinkedHashMap(valMap.size());
+
+        for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) {
+            if (operation == null)
+                operation = e.getValue().operation();
+
+            assert operation == e.getValue().operation();
+
+            assert e.getValue().status() == ValueStatus.PENDING;
+
+            batch.put(e.getKey(), e.getValue().entry());
+        }
+
+        if (updateStore(operation, batch, initSes)) {
+            for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) {
+                StatefulValue<K, V> val = e.getValue();
+
+                val.writeLock().lock();
+
+                try {
+                    val.status(ValueStatus.FLUSHED);
+
+                    StatefulValue<K, V> prev = writeCache.remove(e.getKey());
+
+                    // Additional check to ensure consistency.
+                    assert prev == val : "Map value for key " + e.getKey() + " was updated during flush";
+
+                    val.signalFlushed();
+                }
+                finally {
+                    val.writeLock().unlock();
+                }
+            }
+        }
+        else {
+            // Exception occurred, we must set RETRY status
+            for (StatefulValue<K, V> val : valMap.values()) {
+                val.writeLock().lock();
+
+                try {
+                    val.status(ValueStatus.RETRY);
+
+                    retryEntriesCnt.incrementAndGet();
+
+                    val.signalFlushed();
+                }
+                finally {
+                    val.writeLock().unlock();
+                }
+            }
+        }
+    }
+
+    /**
+     * Tries to update store with the given values and returns {@code true} in case of success.
+     *
+     * <p/> If any exception in underlying store is occurred, this method checks the map size.
+     * If map size exceeds some critical value, then it returns {@code true} and this value will
+     * be lost. If map size does not exceed critical value, it will return false and value will
+     * be retained in write cache.
+     *
+     * @param operation Status indicating operation that should be performed.
+     * @param vals Key-Value map.
+     * @param initSes {@code True} if need to initialize session.
+     * @return {@code true} if value may be deleted from the write cache,
+     *         {@code false} otherwise
+     */
+    private boolean updateStore(StoreOperation operation,
+        Map<K, Entry<? extends K, ? extends  V>> vals,
+        boolean initSes) {
+
+        if (initSes && storeMgr != null)
+            storeMgr.writeBehindSessionInit();
+
+        try {
+            boolean threwEx = true;
+
+            try {
+                switch (operation) {
+                    case PUT:
+                        store.writeAll(vals.values());
+
+                        break;
+
+                    case RMV:
+                        store.deleteAll(vals.keySet());
+
+                        break;
+
+                    default:
+                        assert false : "Unexpected operation: " + operation;
+                }
+
+                threwEx = false;
+
+                return true;
+            }
+            finally {
+                if (initSes && storeMgr != null)
+                    storeMgr.writeBehindSessionEnd(threwEx);
+            }
+        }
+        catch (Exception e) {
+            LT.warn(log, e, "Unable to update underlying store: " + store);
+
+            if (writeCache.sizex() > cacheCriticalSize || stopping.get()) {
+                for (Map.Entry<K, Entry<? extends K, ? extends  V>> entry : vals.entrySet()) {
+                    Object val = entry.getValue() != null ? entry.getValue().getValue() : null;
+
+                    log.warning("Failed to update store (value will be lost as current buffer size is greater " +
+                        "than 'cacheCriticalSize' or node has been stopped before store was repaired) [key=" +
+                        entry.getKey() + ", val=" + val + ", op=" + operation + "]");
+                }
+
+                return true;
+            }
+
+            return false;
+        }
+    }
+
+    /**
+     * Wakes up flushing threads if map size exceeded maximum value or in case of shutdown.
+     */
+    private void wakeUp() {
+        flushLock.lock();
+
+        try {
+            canFlush.signalAll();
+        }
+        finally {
+            flushLock.unlock();
+        }
+    }
+
+    /**
+     * Thread that performs time-based flushing of written values to the underlying storage.
+     */
+    private class Flusher extends GridWorker {
+        /** {@inheritDoc */
+        protected Flusher(String gridName, String name, IgniteLogger log) {
+            super(gridName, name, log);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+            while (!stopping.get() || writeCache.sizex() > 0) {
+                awaitOperationsAvailable();
+
+                flushCache(writeCache.entrySet().iterator());
+            }
+        }
+
+        /**
+         * This method awaits until enough elements in map are available or given timeout is over.
+         *
+         * @throws InterruptedException If awaiting was interrupted.
+         */
+        private void awaitOperationsAvailable() throws InterruptedException {
+            flushLock.lock();
+
+            try {
+                do {
+                    if (writeCache.sizex() <= cacheMaxSize || cacheMaxSize == 0) {
+                        if (cacheFlushFreq > 0)
+                            canFlush.await(cacheFlushFreq, TimeUnit.MILLISECONDS);
+                        else
+                            canFlush.await();
+                    }
+                }
+                while (writeCache.sizex() == 0 && !stopping.get());
+            }
+            finally {
+                flushLock.unlock();
+            }
+        }
+
+        /**
+         * Removes values from the write cache and performs corresponding operation
+         * on the underlying store.
+         *
+         * @param it Iterator for write cache.
+         */
+        private void flushCache(Iterator<Map.Entry<K,StatefulValue<K, V>>> it) {
+            StoreOperation operation = null;
+
+            Map<K, StatefulValue<K, V>> batch = null;
+            Map<K, StatefulValue<K, V>> pending  = U.newLinkedHashMap(batchSize);
+
+            while (it.hasNext()) {
+                Map.Entry<K, StatefulValue<K, V>> e = it.next();
+
+                StatefulValue<K, V> val = e.getValue();
+
+                val.writeLock().lock();
+
+                try {
+                    ValueStatus status = val.status();
+
+                    if (acquired(status))
+                        // Another thread is helping us, continue to the next entry.
+                        continue;
+
+                    if (status == ValueStatus.RETRY)
+                        retryEntriesCnt.decrementAndGet();
+
+                    assert retryEntriesCnt.get() >= 0;
+
+                    val.status(ValueStatus.PENDING);
+
+                    // We scan for the next operation and apply batch on operation change. Null means new batch.
+                    if (operation == null)
+                        operation = val.operation();
+
+                    if (operation != val.operation()) {
+                        // Operation is changed, so we need to perform a batch.
+                        batch = pending;
+                        pending = U.newLinkedHashMap(batchSize);
+
+                        operation = val.operation();
+
+                        pending.put(e.getKey(), val);
+                    }
+                    else
+                        pending.put(e.getKey(), val);
+
+                    if (pending.size() == batchSize) {
+                        batch = pending;
+                        pending = U.newLinkedHashMap(batchSize);
+
+                        operation = null;
+                    }
+                }
+                finally {
+                    val.writeLock().unlock();
+                }
+
+                if (batch != null && !batch.isEmpty()) {
+                    applyBatch(batch, true);
+                    batch = null;
+                }
+            }
+
+            // Process the remainder.
+            if (!pending.isEmpty())
+                applyBatch(pending, true);
+        }
+    }
+
+    /**
+     * For test purposes only.
+     *
+     * @return Write cache for the underlying store operations.
+     */
+    Map<K, StatefulValue<K, V>> writeCache() {
+        return writeCache;
+    }
+
+    /**
+     * Enumeration that represents possible operations on the underlying store.
+     */
+    private enum StoreOperation {
+        /** Put key-value pair to the underlying store. */
+        PUT,
+
+        /** Remove key from the underlying store. */
+        RMV
+    }
+
+    /**
+     * Enumeration that represents possible states of value in the map.
+     */
+    private enum ValueStatus {
+        /** Value is scheduled for write or delete from the underlying cache but has not been captured by flusher. */
+        NEW,
+
+        /** Value is captured by flusher and store operation is performed at the moment. */
+        PENDING,
+
+        /** Store operation has failed and it will be re-tried at the next flush. */
+        RETRY,
+
+        /** Store operation succeeded and this value will be removed by flusher. */
+        FLUSHED,
+    }
+
+    /**
+     * Checks if given status indicates pending or complete flush operation.
+     *
+     * @param status Status to check.
+     * @return {@code true} if status indicates any pending or complete store update operation.
+     */
+    private boolean acquired(ValueStatus status) {
+        return status == ValueStatus.PENDING || status == ValueStatus.FLUSHED;
+    }
+
+    /**
+     * A state-value-operation trio.
+     *
+     * @param <V> Value type.
+     */
+    private static class StatefulValue<K, V> extends ReentrantReadWriteLock {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Value. */
+        @GridToStringInclude
+        private Entry<? extends K, ? extends V> val;
+
+        /** Store operation. */
+        private StoreOperation storeOperation;
+
+        /** Value status. */
+        private ValueStatus valStatus;
+
+        /** Condition to wait for flush event */
+        private Condition flushCond = writeLock().newCondition();
+
+        /**
+         * Creates a state-value pair with {@link ValueStatus#NEW} status.
+         *
+         * @param val Value.
+         * @param storeOperation Store operation.
+         */
+        private StatefulValue(Entry<? extends K, ? extends V> val, StoreOperation storeOperation) {
+            assert storeOperation == StoreOperation.PUT || storeOperation == StoreOperation.RMV;
+
+            this.val = val;
+            this.storeOperation = storeOperation;
+            valStatus = ValueStatus.NEW;
+        }
+
+        /**
+         * @return Stored value.
+         */
+        private Entry<? extends K, ? extends V> entry() {
+            return val;
+        }
+
+        /**
+         * @return Store operation.
+         */
+        private StoreOperation operation() {
+            return storeOperation;
+        }
+
+        /**
+         * @return Value status
+         */
+        private ValueStatus status() {
+            return valStatus;
+        }
+
+        /**
+         * Updates value status.
+         *
+         * @param valStatus Value status.
+         */
+        private void status(ValueStatus valStatus) {
+            this.valStatus = valStatus;
+        }
+
+        /**
+         * Updates both value and value status.
+         *
+         * @param val Value.
+         * @param storeOperation Store operation.
+         * @param valStatus Value status.
+         */
+        private void update(@Nullable Entry<? extends K, ? extends V> val,
+            StoreOperation storeOperation,
+            ValueStatus valStatus) {
+            this.val = val;
+            this.storeOperation = storeOperation;
+            this.valStatus = valStatus;
+        }
+
+        /**
+         * Awaits a signal on flush condition
+         *
+         * @throws IgniteInterruptedCheckedException If thread was interrupted.
+         */
+        private void waitForFlush() throws IgniteInterruptedCheckedException {
+            U.await(flushCond);
+        }
+
+        /**
+         * Signals flush condition.
+         */
+        @SuppressWarnings({"SignalWithoutCorrespondingAwait"})
+        private void signalFlushed() {
+            flushCond.signalAll();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (!(o instanceof StatefulValue))
+                return false;
+
+            StatefulValue other = (StatefulValue)o;
+
+            return F.eq(val, other.val) && F.eq(valStatus, other.valStatus);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = val != null ? val.hashCode() : 0;
+
+            res = 31 * res + valStatus.hashCode();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(StatefulValue.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index acd3202..1b66b4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.processors.cache.store.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.transactions.*;
 import org.apache.ignite.internal.util.*;
@@ -444,11 +445,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
      *
      * @return Store manager.
      */
-    protected GridCacheStoreManager store() {
+    protected CacheStoreManager store() {
         if (!activeCacheIds().isEmpty()) {
             int cacheId = F.first(activeCacheIds());
 
-            GridCacheStoreManager store = cctx.cacheContext(cacheId).store();
+            CacheStoreManager store = cctx.cacheContext(cacheId).store();
 
             return store.configured() ? store : null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index f2407ce..10146a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.processors.cache.dr.*;
+import org.apache.ignite.internal.processors.cache.store.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.processors.dr.*;
 import org.apache.ignite.internal.transactions.*;
@@ -370,7 +371,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 }
 
                 return new GridFinishedFuture<>(
-                    cacheCtx.store().loadAllFromStore(this, keys, c));
+                    cacheCtx.store().loadAll(this, keys, c));
             }
             catch (IgniteCheckedException e) {
                 return new GridFinishedFuture<>(e);
@@ -387,7 +388,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                             return false;
                         }
 
-                        return cacheCtx.store().loadAllFromStore(IgniteTxLocalAdapter.this, keys, c);
+                        return cacheCtx.store().loadAll(IgniteTxLocalAdapter.this, keys, c);
                     }
                 },
                 true);
@@ -492,17 +493,17 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
      */
     @SuppressWarnings({"CatchGenericClass"})
     protected void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException {
-        GridCacheStoreManager store = store();
+        CacheStoreManager store = store();
 
-        if (store != null && store.writeThrough() && storeEnabled() &&
-            (!internal() || groupLock()) && (near() || store.writeToStoreFromDht())) {
+        if (store != null && store.isWriteThrough() && storeEnabled() &&
+            (!internal() || groupLock()) && (near() || store.isWriteToStoreFromDht())) {
             try {
                 if (writeEntries != null) {
                     Map<Object, IgniteBiTuple<Object, GridCacheVersion>> putMap = null;
                     List<Object> rmvCol = null;
-                    GridCacheStoreManager writeStore = null;
+                    CacheStoreManager writeStore = null;
 
-                    boolean skipNear = near() && store.writeToStoreFromDht();
+                    boolean skipNear = near() && store.isWriteToStoreFromDht();
 
                     for (IgniteTxEntry e : writeEntries) {
                         if (skipNear && e.cached().isNear())
@@ -527,7 +528,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                             if (rmvCol != null && !rmvCol.isEmpty()) {
                                 assert writeStore != null;
 
-                                writeStore.removeAllFromStore(this, rmvCol);
+                                writeStore.removeAll(this, rmvCol);
 
                                 // Reset.
                                 rmvCol.clear();
@@ -537,7 +538,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                             // Batch-process puts if cache ID has changed.
                             if (writeStore != null && writeStore != cacheCtx.store() && putMap != null && !putMap.isEmpty()) {
-                                writeStore.putAllToStore(this, putMap);
+                                writeStore.putAll(this, putMap);
 
                                 // Reset.
                                 putMap.clear();
@@ -568,7 +569,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                             if (putMap != null && !putMap.isEmpty()) {
                                 assert writeStore != null;
 
-                                writeStore.putAllToStore(this, putMap);
+                                writeStore.putAll(this, putMap);
 
                                 // Reset.
                                 putMap.clear();
@@ -577,7 +578,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                             }
 
                             if (writeStore != null && writeStore != cacheCtx.store() && rmvCol != null && !rmvCol.isEmpty()) {
-                                writeStore.removeAllFromStore(this, rmvCol);
+                                writeStore.removeAll(this, rmvCol);
 
                                 // Reset.
                                 rmvCol.clear();
@@ -609,7 +610,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                         assert writeStore != null;
 
                         // Batch put at the end of transaction.
-                        writeStore.putAllToStore(this, putMap);
+                        writeStore.putAll(this, putMap);
                     }
 
                     if (rmvCol != null && !rmvCol.isEmpty()) {
@@ -617,12 +618,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                         assert writeStore != null;
 
                         // Batch remove at the end of transaction.
-                        writeStore.removeAllFromStore(this, rmvCol);
+                        writeStore.removeAll(this, rmvCol);
                     }
                 }
 
                 // Commit while locks are held.
-                store.txEnd(this, true);
+                store.sessionEnd(this, true);
             }
             catch (IgniteCheckedException ex) {
                 commitError(ex);
@@ -981,11 +982,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
             }
         }
         else {
-            GridCacheStoreManager store = store();
+            CacheStoreManager store = store();
 
             if (store != null && (!internal() || groupLock())) {
                 try {
-                    store.txEnd(this, true);
+                    store.sessionEnd(this, true);
                 }
                 catch (IgniteCheckedException e) {
                     commitError(e);
@@ -1086,11 +1087,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                 cctx.tm().rollbackTx(this);
 
-                GridCacheStoreManager store = store();
+                CacheStoreManager store = store();
 
-                if (store != null && (near() || store.writeToStoreFromDht())) {
+                if (store != null && (near() || store.isWriteToStoreFromDht())) {
                     if (!internal() || groupLock())
-                        store.txEnd(this, false);
+                        store.sessionEnd(this, false);
                 }
             }
             catch (Error | IgniteCheckedException | RuntimeException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
index 42586d2..6a8117f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
@@ -156,10 +156,4 @@ public interface IgniteCacheObjectProcessor extends GridProcessor {
      *          with {@link IgniteImmutable} annotation.
      */
     public boolean immutable(Object obj);
-
-    /**
-     * @param cacheName Cache name.
-     * @return {@code True} if portable format should be preserved when passing values to cache store.
-     */
-    public boolean keepPortableInStore(@Nullable String cacheName);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index f65b7bd..6e46757 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -234,11 +234,6 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
     }
 
     /** {@inheritDoc} */
-    @Override public boolean keepPortableInStore(@Nullable String cacheName) {
-        return false;
-    }
-
-    /** {@inheritDoc} */
     @Override public void onCacheProcessorStarted() {
         // No-op.
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java
index 69ca1ae..580ff49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java
@@ -22,6 +22,7 @@ import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.dr.*;
+import org.apache.ignite.internal.processors.cache.store.*;
 import org.apache.ignite.plugin.*;
 
 import java.util.*;
@@ -76,10 +77,13 @@ public class CachePluginManager extends GridCacheManagerAdapter {
     /**
      * Creates optional component.
      *
+     * @param ctx Kernal context.
+     * @param cfg Cache configuration.
      * @param cls Component class.
      * @return Created component.
      */
-    public <T> T createComponent(Class<T> cls) {
+    @SuppressWarnings("unchecked")
+    public <T> T createComponent(GridKernalContext ctx, CacheConfiguration cfg, Class<T> cls) {
         for (CachePluginProvider provider : providers) {
             T res = (T)provider.createComponent(cls);
             
@@ -91,6 +95,8 @@ public class CachePluginManager extends GridCacheManagerAdapter {
             return (T)new GridOsCacheDrManager();
         else if (cls.equals(CacheConflictResolutionManager.class))
             return (T)new CacheOsConflictResolutionManager<>();
+        else if (cls.equals(CacheStoreManager.class))
+            return (T)new CacheOsStoreManager(ctx, cfg);
 
         throw new IgniteException("Unsupported component type: " + cls);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 55c1f9d..d7213a4 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -389,10 +389,10 @@ org.apache.ignite.internal.processors.cache.GridCacheProcessor$LocalAffinityFunc
 org.apache.ignite.internal.processors.cache.GridCacheProjectionImpl
 org.apache.ignite.internal.processors.cache.GridCacheProxyImpl
 org.apache.ignite.internal.processors.cache.GridCacheReturn
-org.apache.ignite.internal.processors.cache.GridCacheStoreManager$1
-org.apache.ignite.internal.processors.cache.GridCacheStoreManager$2
-org.apache.ignite.internal.processors.cache.GridCacheStoreManager$3
-org.apache.ignite.internal.processors.cache.GridCacheStoreManager$4
+org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter$1
+org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter$2
+org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter$3
+org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter$4
 org.apache.ignite.internal.processors.cache.GridCacheSwapManager$10
 org.apache.ignite.internal.processors.cache.GridCacheSwapManager$12
 org.apache.ignite.internal.processors.cache.GridCacheSwapManager$14
@@ -433,9 +433,9 @@ org.apache.ignite.internal.processors.cache.GridCacheUtils$8
 org.apache.ignite.internal.processors.cache.GridCacheUtils$9
 org.apache.ignite.internal.processors.cache.GridCacheValueCollection
 org.apache.ignite.internal.processors.cache.GridCacheValueCollection$1
-org.apache.ignite.internal.processors.cache.GridCacheWriteBehindStore$StatefulValue
-org.apache.ignite.internal.processors.cache.GridCacheWriteBehindStore$StoreOperation
-org.apache.ignite.internal.processors.cache.GridCacheWriteBehindStore$ValueStatus
+org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$StatefulValue
+org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$StoreOperation
+org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$ValueStatus
 org.apache.ignite.internal.processors.cache.IgniteCacheProxy
 org.apache.ignite.internal.processors.cache.IgniteCacheProxy$1
 org.apache.ignite.internal.processors.cache.IgniteCacheProxy$2

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java
index b9f9602..c4ba385 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java
@@ -26,14 +26,13 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.apache.ignite.testframework.junits.common.*;
 import org.apache.ignite.transactions.*;
 
-import javax.cache.configuration.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.*;
 
 /**
  * Test that in {@link CacheMode#PARTITIONED} mode cache writes values only to the near cache store. <p/> This check
- * is needed because in current implementation if {@link GridCacheWriteBehindStore} assumes that and user store is
+ * is needed because in current implementation if {@link org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore} assumes that and user store is
  * wrapped only in near cache (see {@link GridCacheProcessor} init logic).
  */
 @SuppressWarnings({"unchecked"})

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractSelfTest.java
deleted file mode 100644
index 883a216..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractSelfTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Harness for {@link GridCacheWriteBehindStore} tests.
- */
-public abstract class GridCacheWriteBehindStoreAbstractSelfTest extends GridCommonAbstractTest {
-    /** Write cache size. */
-    public static final int CACHE_SIZE = 1024;
-
-    /** Value dump interval. */
-    public static final int FLUSH_FREQUENCY = 1000;
-
-    /** Underlying store. */
-    protected GridCacheTestStore delegate = new GridCacheTestStore();
-
-    /** Tested store. */
-    protected GridCacheWriteBehindStore<Integer, String> store;
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        delegate = null;
-        store = null;
-
-        super.afterTestsStopped();
-    }
-
-    /**
-     * Initializes store.
-     *
-     * @param flushThreadCnt Count of flush threads
-     * @throws Exception If failed.
-     */
-    protected void initStore(int flushThreadCnt) throws Exception {
-        store = new GridCacheWriteBehindStore<>(null, "", "", log, delegate);
-
-        store.setFlushFrequency(FLUSH_FREQUENCY);
-
-        store.setFlushSize(CACHE_SIZE);
-
-        store.setFlushThreadCount(flushThreadCnt);
-
-        delegate.reset();
-
-        store.start();
-    }
-
-    /**
-     * Shutdowns store.
-     *
-     * @throws Exception If failed.
-     */
-    protected void shutdownStore() throws Exception {
-        store.stop();
-
-        assertTrue("Store cache must be empty after shutdown", store.writeCache().isEmpty());
-    }
-
-    /**
-     * Performs multiple put, get and remove operations in several threads on a store. After
-     * all threads finished their operations, returns the total set of keys that should be
-     * in underlying store.
-     *
-     * @param threadCnt Count of threads that should update keys.
-     * @param keysPerThread Count of unique keys assigned to a thread.
-     * @return Set of keys that was totally put in store.
-     * @throws Exception If failed.
-     */
-    protected Set<Integer> runPutGetRemoveMultithreaded(int threadCnt, final int keysPerThread) throws Exception {
-        final ConcurrentMap<String, Set<Integer>> perThread = new ConcurrentHashMap<>();
-
-        final AtomicBoolean running = new AtomicBoolean(true);
-
-        final AtomicInteger cntr = new AtomicInteger();
-
-        final AtomicInteger operations = new AtomicInteger();
-
-        IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
-            @SuppressWarnings({"NullableProblems"})
-            @Override public void run() {
-                // Initialize key set for this thread.
-                Set<Integer> set = new HashSet<>();
-
-                Set<Integer> old = perThread.putIfAbsent(Thread.currentThread().getName(), set);
-
-                if (old != null)
-                    set = old;
-
-                List<Integer> original = new ArrayList<>();
-
-                Random rnd = new Random();
-
-                for (int i = 0; i < keysPerThread; i++)
-                    original.add(cntr.getAndIncrement());
-
-                try {
-                    while (running.get()) {
-                        int op = rnd.nextInt(3);
-                        int idx = rnd.nextInt(keysPerThread);
-
-                        int key = original.get(idx);
-
-                        switch (op) {
-                            case 0:
-                                store.write(new CacheEntryImpl<>(key, "val" + key));
-                                set.add(key);
-
-                                operations.incrementAndGet();
-
-                                break;
-
-                            case 1:
-                                store.delete(key);
-                                set.remove(key);
-
-                                operations.incrementAndGet();
-
-                                break;
-
-                            case 2:
-                            default:
-                                store.write(new CacheEntryImpl<>(key, "broken"));
-
-                                String val = store.load(key);
-
-                                assertEquals("Invalid intermediate value: " + val, "broken", val);
-
-                                store.write(new CacheEntryImpl<>(key, "val" + key));
-
-                                set.add(key);
-
-                                // 2 put operations performed here.
-                                operations.incrementAndGet();
-                                operations.incrementAndGet();
-                                operations.incrementAndGet();
-
-                                break;
-                        }
-                    }
-                }
-                catch (Exception e) {
-                    error("Unexpected exception in put thread", e);
-
-                    assert false;
-                }
-            }
-        }, threadCnt, "put");
-
-        U.sleep(10000);
-
-        running.set(false);
-
-        fut.get();
-
-        log().info(">>> " + operations + " operations performed totally");
-
-        Set<Integer> total = new HashSet<>();
-
-        for (Set<Integer> threadVals : perThread.values()) {
-            total.addAll(threadVals);
-        }
-
-        return total;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java
deleted file mode 100644
index 305b8bb..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.apache.ignite.transactions.*;
-import org.jetbrains.annotations.*;
-
-import javax.cache.configuration.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.transactions.TransactionConcurrency.*;
-import static org.apache.ignite.transactions.TransactionIsolation.*;
-
-/**
- * Basic store test.
- */
-public abstract class GridCacheWriteBehindStoreAbstractTest extends GridCommonAbstractTest {
-    /** Flush frequency. */
-    private static final int WRITE_FROM_BEHIND_FLUSH_FREQUENCY = 1000;
-
-    /** Cache store. */
-    private static final GridCacheTestStore store = new GridCacheTestStore();
-
-    /**
-     *
-     */
-    protected GridCacheWriteBehindStoreAbstractTest() {
-        super(true /*start grid. */);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        store.resetTimestamp();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        IgniteCache<?, ?> cache = jcache();
-
-        if (cache != null)
-            cache.clear();
-
-        store.reset();
-    }
-
-    /** @return Caching mode. */
-    protected abstract CacheMode cacheMode();
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override protected final IgniteConfiguration getConfiguration() throws Exception {
-        IgniteConfiguration c = super.getConfiguration();
-
-        TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
-        disco.setIpFinder(new TcpDiscoveryVmIpFinder(true));
-
-        c.setDiscoverySpi(disco);
-
-        CacheConfiguration cc = defaultCacheConfiguration();
-
-        cc.setCacheMode(cacheMode());
-        cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-        cc.setSwapEnabled(false);
-        cc.setAtomicityMode(TRANSACTIONAL);
-
-        cc.setCacheStoreFactory(singletonFactory(store));
-        cc.setReadThrough(true);
-        cc.setWriteThrough(true);
-        cc.setLoadPreviousValue(true);
-
-        cc.setWriteBehindEnabled(true);
-        cc.setWriteBehindFlushFrequency(WRITE_FROM_BEHIND_FLUSH_FREQUENCY);
-
-        c.setCacheConfiguration(cc);
-
-        return c;
-    }
-
-    /** @throws Exception If test fails. */
-    public void testWriteThrough() throws Exception {
-        IgniteCache<Integer, String> cache = jcache();
-
-        Map<Integer, String> map = store.getMap();
-
-        assert map.isEmpty();
-
-        Transaction tx = grid().transactions().txStart(OPTIMISTIC, REPEATABLE_READ);
-
-        try {
-            for (int i = 1; i <= 10; i++) {
-                cache.put(i, Integer.toString(i));
-
-                checkLastMethod(null);
-            }
-
-            tx.commit();
-        }
-        finally {
-            tx.close();
-        }
-
-        // Need to wait WFB flush timeout.
-        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
-
-        checkLastMethod("putAll");
-
-        assert cache.size() == 10;
-
-        for (int i = 1; i <= 10; i++) {
-            String val = map.get(i);
-
-            assert val != null;
-            assert val.equals(Integer.toString(i));
-        }
-
-        store.resetLastMethod();
-
-        tx = grid().transactions().txStart();
-
-        try {
-            for (int i = 1; i <= 10; i++) {
-                String val = cache.getAndRemove(i);
-
-                checkLastMethod(null);
-
-                assert val != null;
-                assert val.equals(Integer.toString(i));
-            }
-
-            tx.commit();
-        }
-        finally {
-            tx.close();
-        }
-
-        // Need to wait WFB flush timeout.
-        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
-
-        checkLastMethod("removeAll");
-
-        assert map.isEmpty();
-    }
-
-    /** @throws Exception If test failed. */
-    public void testReadThrough() throws Exception {
-        IgniteCache<Integer, String> cache = jcache();
-
-        Map<Integer, String> map = store.getMap();
-
-        assert map.isEmpty();
-
-        try (Transaction tx = grid().transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) {
-            for (int i = 1; i <= 10; i++)
-                cache.put(i, Integer.toString(i));
-
-            checkLastMethod(null);
-
-            tx.commit();
-        }
-
-        // Need to wait WFB flush timeout.
-        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
-
-        checkLastMethod("putAll");
-
-        for (int i = 1; i <= 10; i++) {
-            String val = map.get(i);
-
-            assert val != null;
-            assert val.equals(Integer.toString(i));
-        }
-
-        cache.clear();
-
-        assert cache.localSize() == 0;
-        assert cache.localSize() == 0;
-
-        // Need to wait WFB flush timeout.
-        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
-
-        assert map.size() == 10;
-
-        for (int i = 1; i <= 10; i++) {
-            // Read through.
-            String val = cache.get(i);
-
-            checkLastMethod("load");
-
-            assert val != null;
-            assert val.equals(Integer.toString(i));
-        }
-
-        assert cache.size() == 10;
-
-        cache.clear();
-
-        assert cache.localSize() == 0;
-        assert cache.localSize() == 0;
-
-        assert map.size() == 10;
-
-        Set<Integer> keys = new HashSet<>();
-
-        for (int i = 1; i <= 10; i++)
-            keys.add(i);
-
-        // Read through.
-        Map<Integer, String> vals = cache.getAll(keys);
-
-        checkLastMethod("loadAll");
-
-        assert vals != null;
-        assert vals.size() == 10;
-
-        for (int i = 1; i <= 10; i++) {
-            String val = vals.get(i);
-
-            assert val != null;
-            assert val.equals(Integer.toString(i));
-        }
-
-        // Write through.
-        cache.removeAll(keys);
-
-        // Need to wait WFB flush timeout.
-        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
-
-        checkLastMethod("removeAll");
-
-        assert cache.localSize() == 0;
-        assert cache.localSize() == 0;
-
-        assert map.isEmpty();
-    }
-
-    /** @throws Exception If failed. */
-    public void testMultithreaded() throws Exception {
-        final ConcurrentMap<String, Set<Integer>> perThread = new ConcurrentHashMap<>();
-
-        final AtomicBoolean running = new AtomicBoolean(true);
-
-        final IgniteCache<Integer, String> cache = jcache();
-
-        IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
-            @SuppressWarnings({"NullableProblems"})
-            @Override public void run() {
-                // Initialize key set for this thread.
-                Set<Integer> set = new HashSet<>();
-
-                Set<Integer> old = perThread.putIfAbsent(Thread.currentThread().getName(), set);
-
-                if (old != null)
-                    set = old;
-
-                Random rnd = new Random();
-
-                int keyCnt = 20000;
-
-                while (running.get()) {
-                    int op = rnd.nextInt(2);
-                    int key = rnd.nextInt(keyCnt);
-
-                    switch (op) {
-                        case 0:
-                            cache.put(key, "val" + key);
-                            set.add(key);
-
-                            break;
-
-                        case 1:
-                        default:
-                            cache.remove(key);
-                            set.remove(key);
-
-                            break;
-                    }
-                }
-            }
-        }, 10, "put");
-
-        U.sleep(10000);
-
-        running.set(false);
-
-        fut.get();
-
-        U.sleep(5 * WRITE_FROM_BEHIND_FLUSH_FREQUENCY);
-
-        Map<Integer, String> stored = store.getMap();
-
-        for (Map.Entry<Integer, String> entry : stored.entrySet()) {
-            int key = entry.getKey();
-
-            assertEquals("Invalid value for key " + key, "val" + key, entry.getValue());
-
-            boolean found = false;
-
-            for (Set<Integer> threadPuts : perThread.values()) {
-                if (threadPuts.contains(key)) {
-                    found = true;
-
-                    break;
-                }
-            }
-
-            assert found : "No threads found that put key " + key;
-        }
-    }
-
-    /** @param mtd Expected last method value. */
-    private void checkLastMethod(@Nullable String mtd) {
-        String lastMtd = store.getLastMethod();
-
-        if (mtd == null)
-            assert lastMtd == null : "Last method must be null: " + lastMtd;
-        else {
-            assert lastMtd != null : "Last method must be not null";
-            assert lastMtd.equals(mtd) : "Last method does not match [expected=" + mtd + ", lastMtd=" + lastMtd + ']';
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreLocalTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreLocalTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreLocalTest.java
deleted file mode 100644
index 6c050ca..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreLocalTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.cache.*;
-
-/**
- * Tests {@link GridCacheWriteBehindStore} in grid configuration.
- */
-public class GridCacheWriteBehindStoreLocalTest extends GridCacheWriteBehindStoreAbstractTest {
-    /** {@inheritDoc} */
-    @Override protected CacheMode cacheMode() {
-        return CacheMode.LOCAL;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java
deleted file mode 100644
index 9607784..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.util.*;
-
-/**
- * Multithreaded tests for {@link GridCacheWriteBehindStore}.
- */
-public class GridCacheWriteBehindStoreMultithreadedSelfTest extends GridCacheWriteBehindStoreAbstractSelfTest {
-    /**
-     * This test performs complex set of operations on store from multiple threads.
-     *
-     * @throws Exception If failed.
-     */
-    public void testPutGetRemove() throws Exception {
-        initStore(2);
-
-        Set<Integer> exp;
-
-        try {
-            exp = runPutGetRemoveMultithreaded(10, 10);
-        }
-        finally {
-            shutdownStore();
-        }
-
-        Map<Integer, String> map = delegate.getMap();
-
-        Collection<Integer> extra = new HashSet<>(map.keySet());
-
-        extra.removeAll(exp);
-
-        assertTrue("The underlying store contains extra keys: " + extra, extra.isEmpty());
-
-        Collection<Integer> missing = new HashSet<>(exp);
-
-        missing.removeAll(map.keySet());
-
-        assertTrue("Missing keys in the underlying store: " + missing, missing.isEmpty());
-
-        for (Integer key : exp)
-            assertEquals("Invalid value for key " + key, "val" + key, map.get(key));
-    }
-
-    /**
-     * Tests that cache would keep values if underlying store fails.
-     *
-     * @throws Exception If failed.
-     */
-    public void testStoreFailure() throws Exception {
-        delegate.setShouldFail(true);
-
-        initStore(2);
-
-        Set<Integer> exp;
-
-        try {
-            exp = runPutGetRemoveMultithreaded(10, 10);
-
-            U.sleep(FLUSH_FREQUENCY);
-
-            info(">>> There are " + store.getWriteBehindErrorRetryCount() + " entries in RETRY state");
-
-            delegate.setShouldFail(false);
-
-            // Despite that we set shouldFail flag to false, flush thread may just have caught an exception.
-            // If we move store to the stopping state right away, this value will be lost. That's why this sleep
-            // is inserted here to let all exception handlers in write-behind store exit.
-            U.sleep(1000);
-        }
-        finally {
-            shutdownStore();
-        }
-
-        Map<Integer, String> map = delegate.getMap();
-
-        Collection<Integer> extra = new HashSet<>(map.keySet());
-
-        extra.removeAll(exp);
-
-        assertTrue("The underlying store contains extra keys: " + extra, extra.isEmpty());
-
-        Collection<Integer> missing = new HashSet<>(exp);
-
-        missing.removeAll(map.keySet());
-
-        assertTrue("Missing keys in the underlying store: " + missing, missing.isEmpty());
-
-        for (Integer key : exp)
-            assertEquals("Invalid value for key " + key, "val" + key, map.get(key));
-    }
-
-    /**
-     * Tests store consistency in case of high put rate, when flush is performed from the same thread
-     * as put or remove operation.
-     *
-     * @throws Exception If failed.
-     */
-    public void testFlushFromTheSameThread() throws Exception {
-        // 50 milliseconds should be enough.
-        delegate.setOperationDelay(50);
-
-        initStore(2);
-
-        Set<Integer> exp;
-
-        int start = store.getWriteBehindTotalCriticalOverflowCount();
-
-        try {
-            //We will have in total 5 * CACHE_SIZE keys that should be enough to grow map size to critical value.
-            exp = runPutGetRemoveMultithreaded(5, CACHE_SIZE);
-        }
-        finally {
-            log.info(">>> Done inserting, shutting down the store");
-
-            shutdownStore();
-        }
-
-        // Restore delay.
-        delegate.setOperationDelay(0);
-
-        Map<Integer, String> map = delegate.getMap();
-
-        int end = store.getWriteBehindTotalCriticalOverflowCount();
-
-        log.info(">>> There are " + exp.size() + " keys in store, " + (end - start) + " overflows detected");
-
-        assertTrue("No cache overflows detected (a bug or too few keys or too few delay?)", end > start);
-
-        Collection<Integer> extra = new HashSet<>(map.keySet());
-
-        extra.removeAll(exp);
-
-        assertTrue("The underlying store contains extra keys: " + extra, extra.isEmpty());
-
-        Collection<Integer> missing = new HashSet<>(exp);
-
-        missing.removeAll(map.keySet());
-
-        assertTrue("Missing keys in the underlying store: " + missing, missing.isEmpty());
-
-        for (Integer key : exp)
-            assertEquals("Invalid value for key " + key, "val" + key, map.get(key));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java
deleted file mode 100644
index 8fb4f68..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.store.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.apache.ignite.transactions.*;
-
-import java.util.*;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.transactions.TransactionConcurrency.*;
-import static org.apache.ignite.transactions.TransactionIsolation.*;
-
-/**
- * Tests write-behind store with near and dht commit option.
- */
-public class GridCacheWriteBehindStorePartitionedMultiNodeSelfTest extends GridCommonAbstractTest {
-    /** Grids to start. */
-    private static final int GRID_CNT = 5;
-
-    /** Ip finder. */
-    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
-    /** Flush frequency. */
-    public static final int WRITE_BEHIND_FLUSH_FREQ = 1000;
-
-    /** Stores per grid. */
-    private GridCacheTestStore[] stores = new GridCacheTestStore[GRID_CNT];
-
-    /** Start grid counter. */
-    private int idx;
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration c = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
-        disco.setIpFinder(ipFinder);
-
-        c.setDiscoverySpi(disco);
-
-        CacheConfiguration cc = defaultCacheConfiguration();
-
-        cc.setCacheMode(CacheMode.PARTITIONED);
-        cc.setWriteBehindEnabled(true);
-        cc.setWriteBehindFlushFrequency(WRITE_BEHIND_FLUSH_FREQ);
-        cc.setAtomicityMode(TRANSACTIONAL);
-        cc.setNearConfiguration(new NearCacheConfiguration());
-
-        CacheStore store = stores[idx] = new GridCacheTestStore();
-
-        cc.setCacheStoreFactory(singletonFactory(store));
-        cc.setReadThrough(true);
-        cc.setWriteThrough(true);
-        cc.setLoadPreviousValue(true);
-
-        c.setCacheConfiguration(cc);
-
-        idx++;
-
-        return c;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stores = null;
-
-        super.afterTestsStopped();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void prepare() throws Exception {
-        idx = 0;
-
-        startGrids(GRID_CNT);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        super.afterTest();
-
-        stopAllGrids();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSingleWritesOnDhtNode() throws Exception {
-        checkSingleWrites();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBatchWritesOnDhtNode() throws Exception {
-        checkBatchWrites();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxWritesOnDhtNode() throws Exception {
-        checkTxWrites();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void checkSingleWrites() throws Exception {
-        prepare();
-
-        IgniteCache<Integer, String> cache = grid(0).cache(null);
-
-        for (int i = 0; i < 100; i++)
-            cache.put(i, String.valueOf(i));
-
-        checkWrites();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void checkBatchWrites() throws Exception {
-        prepare();
-
-        Map<Integer, String> map = new HashMap<>();
-
-        for (int i = 0; i < 100; i++)
-            map.put(i, String.valueOf(i));
-
-        grid(0).cache(null).putAll(map);
-
-        checkWrites();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void checkTxWrites() throws Exception {
-        prepare();
-
-        IgniteCache<Object, Object> cache = grid(0).cache(null);
-
-        try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
-            for (int i = 0; i < 100; i++)
-                cache.put(i, String.valueOf(i));
-
-            tx.commit();
-        }
-
-        checkWrites();
-    }
-
-    /**
-     * @throws IgniteInterruptedCheckedException If sleep was interrupted.
-     */
-    private void checkWrites() throws IgniteInterruptedCheckedException {
-        U.sleep(WRITE_BEHIND_FLUSH_FREQ * 2);
-
-        Collection<Integer> allKeys = new ArrayList<>(100);
-
-        for (int i = 0; i < GRID_CNT; i++) {
-            Map<Integer,String> map = stores[i].getMap();
-
-            assertFalse("Missing writes for node: " + i, map.isEmpty());
-
-            allKeys.addAll(map.keySet());
-
-            // Check there is no intersection.
-            for (int j = 0; j < GRID_CNT; j++) {
-                if (i == j)
-                    continue;
-
-                Collection<Integer> intersection = new HashSet<>(stores[j].getMap().keySet());
-
-                intersection.retainAll(map.keySet());
-
-                assertTrue(intersection.isEmpty());
-            }
-        }
-
-        assertEquals(100, allKeys.size());
-
-        for (int i = 0; i < 100; i++)
-            assertTrue(allKeys.contains(i));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java
deleted file mode 100644
index f9e454f..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.cache.*;
-
-/**
- * Tests {@link GridCacheWriteBehindStore} in partitioned configuration.
- */
-public class GridCacheWriteBehindStorePartitionedTest extends GridCacheWriteBehindStoreAbstractTest {
-    /** {@inheritDoc} */
-    @Override protected CacheMode cacheMode() {
-        return CacheMode.PARTITIONED;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java
deleted file mode 100644
index c809f90..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.cache.*;
-
-/**
- * Tests {@link GridCacheWriteBehindStore} in grid configuration.
- */
-public class GridCacheWriteBehindStoreReplicatedTest extends GridCacheWriteBehindStoreAbstractTest {
-    /** {@inheritDoc} */
-    @Override protected CacheMode cacheMode() {
-        return CacheMode.REPLICATED;
-    }
-}


Mime
View raw message