ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [2/5] incubator-ignite git commit: # GG-9973: WIP.
Date Thu, 02 Apr 2015 12:54:38 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheOsStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheOsStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheOsStoreManager.java
new file mode 100644
index 0000000..9fdbd8e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheOsStoreManager.java
@@ -0,0 +1,1202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.lifecycle.*;
+import org.apache.ignite.transactions.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import javax.cache.integration.*;
+import java.util.*;
+
+/**
+ * Store manager.
+ */
+@SuppressWarnings("AssignmentToCatchBlockParameter")
+public class GridCacheOsStoreManager extends GridCacheManagerAdapter {
+    /** */
+    private static final UUID SES_ATTR = UUID.randomUUID();
+
+    /** */
+    private final CacheStore<Object, Object> store;
+
+    /** */
+    private final CacheStore<?, ?> cfgStore;
+
+    /** */
+    private final CacheStoreBalancingWrapper<Object, Object> singleThreadGate;
+
+    /** */
+    private final ThreadLocal<SessionData> sesHolder;
+
+    /** */
+    private final boolean locStore;
+
+    /** */
+    private final boolean writeThrough;
+
+    /** */
+    private boolean convertPortable;
+
+    /**
+     * @param ctx Kernal context.
+     * @param sesHolders Session holders map to use the same session holder for different managers if they use
+     *        the same store instance.
+     * @param cfgStore Store provided in configuration.
+     * @param cfg Cache configuration.
+     * @throws IgniteCheckedException In case of error.
+     */
+    @SuppressWarnings("unchecked")
+    public GridCacheOsStoreManager(
+        GridKernalContext ctx,
+        Map<CacheStore, ThreadLocal> sesHolders,
+        @Nullable CacheStore<Object, Object> cfgStore,
+        CacheConfiguration cfg
+    ) throws IgniteCheckedException {
+        this.cfgStore = cfgStore;
+
+        store = cacheStoreWrapper(ctx, cfgStore, cfg);
+
+        singleThreadGate = store == null ? null : new CacheStoreBalancingWrapper<>(store);
+
+        writeThrough = cfg.isWriteThrough();
+
+        ThreadLocal<SessionData> sesHolder0 = null;
+
+        if (cfgStore != null) {
+            sesHolder0 = sesHolders.get(cfgStore);
+
+            if (sesHolder0 == null) {
+                ThreadLocalSession locSes = new ThreadLocalSession();
+
+                if (ctx.resource().injectStoreSession(cfgStore, locSes)) {
+                    sesHolder0 = locSes.sesHolder;
+
+                    sesHolders.put(cfgStore, sesHolder0);
+                }
+            }
+        }
+
+        sesHolder = sesHolder0;
+
+        locStore = U.hasAnnotation(cfgStore, CacheLocalStore.class);
+    }
+
+    /**
+     * @return {@code True} is write-through is enabled.
+     */
+    public boolean writeThrough() {
+        return writeThrough;
+    }
+
+    /**
+     * @return Unwrapped store provided in configuration.
+     */
+    public CacheStore<?, ?> configuredStore() {
+        return cfgStore;
+    }
+
+    /**
+     * Creates a wrapped cache store if write-behind cache is configured.
+     *
+     * @param ctx Kernal context.
+     * @param cfgStore Store provided in configuration.
+     * @param cfg Cache configuration.
+     * @return Instance if {@link GridCacheWriteBehindStore} if write-behind store is configured,
+     *         or user-defined cache store.
+     */
+    @SuppressWarnings({"unchecked"})
+    private CacheStore cacheStoreWrapper(GridKernalContext ctx,
+        @Nullable CacheStore cfgStore,
+        CacheConfiguration cfg) {
+        if (cfgStore == null || !cfg.isWriteBehindEnabled())
+            return cfgStore;
+
+        GridCacheWriteBehindStore store = new GridCacheWriteBehindStore(this,
+            ctx.gridName(),
+            cfg.getName(),
+            ctx.log(GridCacheWriteBehindStore.class),
+            cfgStore);
+
+        store.setFlushSize(cfg.getWriteBehindFlushSize());
+        store.setFlushThreadCount(cfg.getWriteBehindFlushThreadCount());
+        store.setFlushFrequency(cfg.getWriteBehindFlushFrequency());
+        store.setBatchSize(cfg.getWriteBehindBatchSize());
+
+        return store;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void start0() throws IgniteCheckedException {
+        if (store instanceof LifecycleAware) {
+            try {
+                // Avoid second start() call on store in case when near cache is enabled.
+                if (cctx.config().isWriteBehindEnabled()) {
+                    if (!cctx.isNear())
+                        ((LifecycleAware)store).start();
+                }
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException("Failed to start cache store: " + e, e);
+            }
+        }
+
+        convertPortable = !cctx.cacheObjects().keepPortableInStore(cctx.name());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void stop0(boolean cancel) {
+        if (store instanceof LifecycleAware) {
+            try {
+                // Avoid second start() call on store in case when near cache is enabled.
+                if (cctx.config().isWriteBehindEnabled()) {
+                    if (!cctx.isNear())
+                        ((LifecycleAware)store).stop();
+                }
+            }
+            catch (Exception e) {
+                U.error(log(), "Failed to stop cache store.", e);
+            }
+        }
+    }
+
+    /**
+     * @return Convert-portable flag.
+     */
+    public boolean convertPortable() {
+        return convertPortable;
+    }
+
+    /**
+     * @param convertPortable Convert-portable flag.
+     */
+    public void convertPortable(boolean convertPortable) {
+        this.convertPortable = convertPortable;
+    }
+
+    /**
+     * @return {@code true} If local store is configured.
+     */
+    public boolean isLocalStore() {
+        return locStore;
+    }
+
+    /**
+     * @return {@code true} If store configured.
+     */
+    public boolean configured() {
+        return store != null;
+    }
+
+    /**
+     * Loads data from persistent store.
+     *
+     * @param tx Cache transaction.
+     * @param key Cache key.
+     * @return Loaded value, possibly <tt>null</tt>.
+     * @throws IgniteCheckedException If data loading failed.
+     */
+    @SuppressWarnings("unchecked")
+    @Nullable public Object loadFromStore(@Nullable IgniteInternalTx tx, KeyCacheObject key)
+        throws IgniteCheckedException {
+        return loadFromStore(tx, key, true);
+    }
+
+    /**
+     * Loads data from persistent store.
+     *
+     * @param tx Cache transaction.
+     * @param key Cache key.
+     * @param convert Convert flag.
+     * @return Loaded value, possibly <tt>null</tt>.
+     * @throws IgniteCheckedException If data loading failed.
+     */
+    @SuppressWarnings("unchecked")
+    @Nullable private Object loadFromStore(@Nullable IgniteInternalTx tx,
+        KeyCacheObject key,
+        boolean convert)
+        throws IgniteCheckedException {
+        if (store != null) {
+            if (key.internal())
+                // Never load internal keys from store as they are never persisted.
+                return null;
+
+            Object storeKey = key.value(cctx.cacheObjectContext(), false);
+
+            if (convertPortable)
+                storeKey = cctx.unwrapPortableIfNeeded(storeKey, false);
+
+            if (log.isDebugEnabled())
+                log.debug("Loading value from store for key: " + storeKey);
+
+            initSession(tx);
+
+            boolean thewEx = true;
+
+            Object val = null;
+
+            try {
+                val = singleThreadGate.load(storeKey);
+
+                thewEx = false;
+            }
+            catch (ClassCastException e) {
+                handleClassCastException(e);
+            }
+            catch (CacheLoaderException e) {
+                throw new IgniteCheckedException(e);
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException(new CacheLoaderException(e));
+            }
+            finally {
+                endSession(tx, thewEx);
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Loaded value from store [key=" + key + ", val=" + val + ']');
+
+            if (convert) {
+                val = convert(val);
+
+                return val;
+            }
+            else
+                return val;
+        }
+
+        return null;
+    }
+
+    /**
+     * @param val Internal value.
+     * @return User value.
+     */
+    @SuppressWarnings("unchecked")
+    private Object convert(Object val) {
+        if (val == null)
+            return null;
+
+        return locStore ? ((IgniteBiTuple<Object, GridCacheVersion>)val).get1() : val;
+    }
+
+    /**
+     * @return Whether DHT transaction can write to store from DHT.
+     */
+    public boolean writeToStoreFromDht() {
+        return cctx.config().isWriteBehindEnabled() || locStore;
+    }
+
+    /**
+     * @param tx Cache transaction.
+     * @param keys Cache keys.
+     * @param vis Closure to apply for loaded elements.
+     * @throws IgniteCheckedException If data loading failed.
+     */
+    public void localStoreLoadAll(@Nullable IgniteInternalTx tx,
+        Collection<? extends KeyCacheObject> keys,
+        final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> vis)
+        throws IgniteCheckedException {
+        assert store != null;
+        assert locStore;
+
+        loadAllFromStore(tx, keys, null, vis);
+    }
+
+    /**
+     * Loads data from persistent store.
+     *
+     * @param tx Cache transaction.
+     * @param keys Cache keys.
+     * @param vis Closure.
+     * @return {@code True} if there is a persistent storage.
+     * @throws IgniteCheckedException If data loading failed.
+     */
+    @SuppressWarnings({"unchecked"})
+    public boolean loadAllFromStore(@Nullable IgniteInternalTx tx,
+        Collection<? extends KeyCacheObject> keys,
+        final IgniteBiInClosure<KeyCacheObject, Object> vis) throws IgniteCheckedException {
+        if (store != null) {
+            loadAllFromStore(tx, keys, vis, null);
+
+            return true;
+        }
+        else {
+            for (KeyCacheObject key : keys)
+                vis.apply(key, null);
+        }
+
+        return false;
+    }
+
+    /**
+     * @param tx Cache transaction.
+     * @param keys Keys to load.
+     * @param vis Key/value closure (only one of vis or verVis can be specified).
+     * @param verVis Key/value/version closure (only one of vis or verVis can be specified).
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private void loadAllFromStore(@Nullable IgniteInternalTx tx,
+        Collection<? extends KeyCacheObject> keys,
+        @Nullable final IgniteBiInClosure<KeyCacheObject, Object> vis,
+        @Nullable final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> verVis)
+        throws IgniteCheckedException {
+        assert vis != null ^ verVis != null;
+        assert verVis == null || locStore;
+
+        final boolean convert = verVis == null;
+
+        if (!keys.isEmpty()) {
+            if (keys.size() == 1) {
+                KeyCacheObject key = F.first(keys);
+
+                if (convert)
+                    vis.apply(key, loadFromStore(tx, key));
+                else {
+                    IgniteBiTuple<Object, GridCacheVersion> t =
+                        (IgniteBiTuple<Object, GridCacheVersion>)loadFromStore(tx, key, false);
+
+                    if (t != null)
+                        verVis.apply(key, t.get1(), t.get2());
+                }
+
+                return;
+            }
+
+            Collection<Object> keys0;
+
+            if (convertPortable) {
+                keys0 = F.viewReadOnly(keys, new C1<KeyCacheObject, Object>() {
+                    @Override public Object apply(KeyCacheObject key) {
+                        return cctx.unwrapPortableIfNeeded(key.value(cctx.cacheObjectContext(), false), false);
+                    }
+                });
+            }
+            else {
+                keys0 = F.viewReadOnly(keys, new C1<KeyCacheObject, Object>() {
+                    @Override public Object apply(KeyCacheObject key) {
+                        return key.value(cctx.cacheObjectContext(), false);
+                    }
+                });
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Loading values from store for keys: " + keys0);
+
+            initSession(tx);
+
+            boolean thewEx = true;
+
+            try {
+                IgniteBiInClosure<Object, Object> c = new CI2<Object, Object>() {
+                    @SuppressWarnings("ConstantConditions")
+                    @Override public void apply(Object k, Object val) {
+                        if (convert) {
+                            Object v = convert(val);
+
+                            vis.apply(cctx.toCacheKeyObject(k), v);
+                        }
+                        else {
+                            IgniteBiTuple<Object, GridCacheVersion> v = (IgniteBiTuple<Object, GridCacheVersion>)val;
+
+                            if (v != null)
+                                verVis.apply(cctx.toCacheKeyObject(k), v.get1(), v.get2());
+                        }
+                    }
+                };
+
+                if (keys.size() > singleThreadGate.loadAllThreshold()) {
+                    Map<Object, Object> map = store.loadAll(keys0);
+
+                    if (map != null) {
+                        for (Map.Entry<Object, Object> e : map.entrySet())
+                            c.apply(cctx.toCacheKeyObject(e.getKey()), e.getValue());
+                    }
+                }
+                else
+                    singleThreadGate.loadAll(keys0, c);
+
+                thewEx = false;
+            }
+            catch (ClassCastException e) {
+                handleClassCastException(e);
+            }
+            catch (CacheLoaderException e) {
+                throw new IgniteCheckedException(e);
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException(new CacheLoaderException(e));
+            }
+            finally {
+                endSession(tx, thewEx);
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Loaded values from store for keys: " + keys0);
+        }
+    }
+
+    /**
+     * Loads data from persistent store.
+     *
+     * @param vis Closer to cache loaded elements.
+     * @param args User arguments.
+     * @return {@code True} if there is a persistent storage.
+     * @throws IgniteCheckedException If data loading failed.
+     */
+    @SuppressWarnings({"ErrorNotRethrown", "unchecked"})
+    public boolean loadCache(final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> vis, Object[] args)
+        throws IgniteCheckedException {
+        if (store != null) {
+            if (log.isDebugEnabled())
+                log.debug("Loading all values from store.");
+
+            initSession(null);
+
+            boolean thewEx = true;
+
+            try {
+                store.loadCache(new IgniteBiInClosure<Object, Object>() {
+                    @Override public void apply(Object k, Object o) {
+                        Object v;
+                        GridCacheVersion ver = null;
+
+                        if (locStore) {
+                            IgniteBiTuple<Object, GridCacheVersion> t = (IgniteBiTuple<Object, GridCacheVersion>)o;
+
+                            v = t.get1();
+                            ver = t.get2();
+                        }
+                        else
+                            v = o;
+
+                        KeyCacheObject cacheKey = cctx.toCacheKeyObject(k);
+
+                        vis.apply(cacheKey, v, ver);
+                    }
+                }, args);
+
+                thewEx = false;
+            }
+            catch (CacheLoaderException e) {
+                throw new IgniteCheckedException(e);
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException(new CacheLoaderException(e));
+            }
+            finally {
+                endSession(null, thewEx);
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Loaded all values from store.");
+
+            return true;
+        }
+
+        LT.warn(log, null, "Calling Cache.loadCache() method will have no effect, " +
+            "CacheConfiguration.getStore() is not defined for cache: " + cctx.namexx());
+
+        return false;
+    }
+
+    /**
+     * Puts key-value pair into storage.
+     *
+     * @param tx Cache transaction.
+     * @param key Key.
+     * @param val Value.
+     * @param ver Version.
+     * @return {@code true} If there is a persistent storage.
+     * @throws IgniteCheckedException If storage failed.
+     */
+    @SuppressWarnings("unchecked")
+    public boolean putToStore(@Nullable IgniteInternalTx tx, Object key, Object val, GridCacheVersion ver)
+        throws IgniteCheckedException {
+        if (store != null) {
+            // Never persist internal keys.
+            if (key instanceof GridCacheInternal)
+                return true;
+
+            if (convertPortable) {
+                key = cctx.unwrapPortableIfNeeded(key, false);
+                val = cctx.unwrapPortableIfNeeded(val, false);
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Storing value in cache store [key=" + key + ", val=" + val + ']');
+
+            initSession(tx);
+
+            boolean thewEx = true;
+
+            try {
+                store.write(new CacheEntryImpl<>(key, locStore ? F.t(val, ver) : val));
+
+                thewEx = false;
+            }
+            catch (ClassCastException e) {
+                handleClassCastException(e);
+            }
+            catch (CacheWriterException e) {
+                throw new IgniteCheckedException(e);
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException(new CacheWriterException(e));
+            }
+            finally {
+                endSession(tx, thewEx);
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Stored value in cache store [key=" + key + ", val=" + val + ']');
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Puts key-value pair into storage.
+     *
+     * @param tx Cache transaction.
+     * @param map Map.
+     * @return {@code True} if there is a persistent storage.
+     * @throws IgniteCheckedException If storage failed.
+     */
+    public boolean putAllToStore(@Nullable IgniteInternalTx tx,
+        Map<Object, IgniteBiTuple<Object, GridCacheVersion>> map)
+        throws IgniteCheckedException
+    {
+        if (F.isEmpty(map))
+            return true;
+
+        if (map.size() == 1) {
+            Map.Entry<Object, IgniteBiTuple<Object, GridCacheVersion>> e = map.entrySet().iterator().next();
+
+            return putToStore(tx, e.getKey(), e.getValue().get1(), e.getValue().get2());
+        }
+        else {
+            if (store != null) {
+                EntriesView entries = new EntriesView((Map)map);
+
+                if (log.isDebugEnabled())
+                    log.debug("Storing values in cache store [entries=" + entries + ']');
+
+                initSession(tx);
+
+                boolean thewEx = true;
+
+                try {
+                    store.writeAll(entries);
+
+                    thewEx = false;
+                }
+                catch (ClassCastException e) {
+                    handleClassCastException(e);
+                }
+                catch (Exception e) {
+                    if (!(e instanceof CacheWriterException))
+                        e = new CacheWriterException(e);
+
+                    if (!entries.isEmpty()) {
+                        List<Object> keys = new ArrayList<>(entries.size());
+
+                        for (Cache.Entry<?, ?> entry : entries)
+                            keys.add(entry.getKey());
+
+                        throw new CacheStorePartialUpdateException(keys, e);
+                    }
+
+                    throw new IgniteCheckedException(e);
+                }
+                finally {
+                    endSession(tx, thewEx);
+                }
+
+                if (log.isDebugEnabled())
+                    log.debug("Stored value in cache store [entries=" + entries + ']');
+
+                return true;
+            }
+
+            return false;
+        }
+    }
+
+    /**
+     * @param tx Cache transaction.
+     * @param key Key.
+     * @return {@code True} if there is a persistent storage.
+     * @throws IgniteCheckedException If storage failed.
+     */
+    @SuppressWarnings("unchecked")
+    public boolean removeFromStore(@Nullable IgniteInternalTx tx, Object key) throws IgniteCheckedException {
+        if (store != null) {
+            // Never remove internal key from store as it is never persisted.
+            if (key instanceof GridCacheInternal)
+                return false;
+
+            if (convertPortable)
+                key = cctx.unwrapPortableIfNeeded(key, false);
+
+            if (log.isDebugEnabled())
+                log.debug("Removing value from cache store [key=" + key + ']');
+
+            initSession(tx);
+
+            boolean thewEx = true;
+
+            try {
+                store.delete(key);
+
+                thewEx = false;
+            }
+            catch (ClassCastException e) {
+                handleClassCastException(e);
+            }
+            catch (CacheWriterException e) {
+                throw new IgniteCheckedException(e);
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException(new CacheWriterException(e));
+            }
+            finally {
+                endSession(tx, thewEx);
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Removed value from cache store [key=" + key + ']');
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * @param tx Cache transaction.
+     * @param keys Key.
+     * @return {@code True} if there is a persistent storage.
+     * @throws IgniteCheckedException If storage failed.
+     */
+    @SuppressWarnings("unchecked")
+    public boolean removeAllFromStore(@Nullable IgniteInternalTx tx, Collection<Object> keys)
+        throws IgniteCheckedException {
+        if (F.isEmpty(keys))
+            return true;
+
+        if (keys.size() == 1) {
+            Object key = keys.iterator().next();
+
+            return removeFromStore(tx, key);
+        }
+
+        if (store != null) {
+            Collection<Object> keys0 = convertPortable ? cctx.unwrapPortablesIfNeeded(keys, false) : keys;
+
+            if (log.isDebugEnabled())
+                log.debug("Removing values from cache store [keys=" + keys0 + ']');
+
+            initSession(tx);
+
+            boolean thewEx = true;
+
+            try {
+                store.deleteAll(keys0);
+
+                thewEx = false;
+            }
+            catch (ClassCastException e) {
+                handleClassCastException(e);
+            }
+            catch (Exception e) {
+                if (!(e instanceof CacheWriterException))
+                    e = new CacheWriterException(e);
+
+                if (!keys0.isEmpty())
+                    throw new CacheStorePartialUpdateException(keys0, e);
+
+                throw new IgniteCheckedException(e);
+            }
+            finally {
+                endSession(tx, thewEx);
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Removed values from cache store [keys=" + keys0 + ']');
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * @return Store.
+     */
+    public CacheStore<Object, Object> store() {
+        return store;
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void forceFlush() throws IgniteCheckedException {
+        if (store instanceof GridCacheWriteBehindStore)
+            ((GridCacheWriteBehindStore)store).forceFlush();
+    }
+
+    /**
+     * @param tx Transaction.
+     * @param commit Commit.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void txEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException {
+        assert store != null;
+
+        initSession(tx);
+
+        try {
+            store.sessionEnd(commit);
+        }
+        finally {
+            if (sesHolder != null) {
+                sesHolder.set(null);
+
+                tx.removeMeta(SES_ATTR);
+            }
+        }
+    }
+
+    /**
+     * @param e Class cast exception.
+     * @throws IgniteCheckedException Thrown exception.
+     */
+    private void handleClassCastException(ClassCastException e) throws IgniteCheckedException {
+        assert e != null;
+
+        if (e.getMessage() != null) {
+            throw new IgniteCheckedException("Cache store must work with portable objects if portables are " +
+                "enabled for cache [cacheName=" + cctx.namex() + ']', e);
+        }
+        else
+            throw e;
+    }
+
+    /**
+     * Clears session holder.
+     */
+    void endSession(@Nullable IgniteInternalTx tx, boolean threwEx) throws IgniteCheckedException {
+        try {
+            if (tx == null)
+                store.sessionEnd(threwEx);
+        }
+        catch (Exception e) {
+            if (!threwEx)
+                throw U.cast(e);
+        }
+        finally {
+            if (sesHolder != null)
+                sesHolder.set(null);
+        }
+    }
+
+    /**
+     * @param tx Current transaction.
+     */
+    void initSession(@Nullable IgniteInternalTx tx) {
+        if (sesHolder == null)
+            return;
+
+        assert sesHolder.get() == null;
+
+        SessionData ses;
+
+        if (tx != null) {
+            ses = tx.meta(SES_ATTR);
+
+            if (ses == null) {
+                ses = new SessionData(tx, cctx.name());
+
+                tx.addMeta(SES_ATTR, ses);
+            }
+            else
+                // Session cache name may change in cross-cache transaction.
+                ses.cacheName(cctx.name());
+        }
+        else
+            ses = new SessionData(null, cctx.name());
+
+        sesHolder.set(ses);
+    }
+
+    /**
+     *
+     */
+    private static class SessionData {
+        /** */
+        @GridToStringExclude
+        private final IgniteInternalTx tx;
+
+        /** */
+        private String cacheName;
+
+        /** */
+        @GridToStringInclude
+        private Map<Object, Object> props;
+
+        /**
+         * @param tx Current transaction.
+         * @param cacheName Cache name.
+         */
+        private SessionData(@Nullable IgniteInternalTx tx, @Nullable String cacheName) {
+            this.tx = tx;
+            this.cacheName = cacheName;
+        }
+
+        /**
+         * @return Transaction.
+         */
+        @Nullable private Transaction transaction() {
+            return tx != null ? tx.proxy() : null;
+        }
+
+        /**
+         * @return Properties.
+         */
+        private Map<Object, Object> properties() {
+            if (props == null)
+                props = new GridLeanMap<>();
+
+            return props;
+        }
+
+        /**
+         * @return Cache name.
+         */
+        private String cacheName() {
+            return cacheName;
+        }
+
+        /**
+         * @param cacheName Cache name.
+         */
+        private void cacheName(String cacheName) {
+            this.cacheName = cacheName;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SessionData.class, this, "tx", CU.txString(tx));
+        }
+    }
+
+    /**
+     *
+     */
+    private static class ThreadLocalSession implements CacheStoreSession {
+        /** */
+        private final ThreadLocal<SessionData> sesHolder = new ThreadLocal<>();
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Transaction transaction() {
+            SessionData ses0 = sesHolder.get();
+
+            return ses0 != null ? ses0.transaction() : null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isWithinTransaction() {
+            return transaction() != null;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public <K1, V1> Map<K1, V1> properties() {
+            SessionData ses0 = sesHolder.get();
+
+            return ses0 != null ? (Map<K1, V1>)ses0.properties() : null;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public String cacheName() {
+            SessionData ses0 = sesHolder.get();
+
+            return ses0 != null ? ses0.cacheName() : null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ThreadLocalSession.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    @SuppressWarnings("unchecked")
+    private class EntriesView extends AbstractCollection<Cache.Entry<?, ?>> {
+        /** */
+        private final Map<?, IgniteBiTuple<?, GridCacheVersion>> map;
+
+        /** */
+        private Set<Object> rmvd;
+
+        /** */
+        private boolean cleared;
+
+        /**
+         * @param map Map.
+         */
+        private EntriesView(Map<?, IgniteBiTuple<?, GridCacheVersion>> map) {
+            assert map != null;
+
+            this.map = map;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int size() {
+            return cleared ? 0 : (map.size() - (rmvd != null ? rmvd.size() : 0));
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isEmpty() {
+            return cleared || !iterator().hasNext();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean contains(Object o) {
+            if (cleared || !(o instanceof Cache.Entry))
+                return false;
+
+            Cache.Entry<?, ?> e = (Cache.Entry<?, ?>)o;
+
+            return map.containsKey(e.getKey());
+        }
+
+        /** {@inheritDoc} */
+        @NotNull @Override public Iterator<Cache.Entry<?, ?>> iterator() {
+            if (cleared)
+                return F.emptyIterator();
+
+            final Iterator<Map.Entry<?, IgniteBiTuple<?, GridCacheVersion>>> it0 = (Iterator)map.entrySet().iterator();
+
+            return new Iterator<Cache.Entry<?, ?>>() {
+                /** */
+                private Cache.Entry<?, ?> cur;
+
+                /** */
+                private Cache.Entry<?, ?> next;
+
+                /**
+                 *
+                 */
+                {
+                    checkNext();
+                }
+
+                /**
+                 *
+                 */
+                private void checkNext() {
+                    while (it0.hasNext()) {
+                        Map.Entry<?, IgniteBiTuple<?, GridCacheVersion>> e = it0.next();
+
+                        Object k = e.getKey();
+
+                        if (rmvd != null && rmvd.contains(k))
+                            continue;
+
+                        Object v = locStore ? e.getValue() : e.getValue().get1();
+
+                        if (convertPortable) {
+                            k = cctx.unwrapPortableIfNeeded(k, false);
+                            v = cctx.unwrapPortableIfNeeded(v, false);
+                        }
+
+                        next = new CacheEntryImpl<>(k, v);
+
+                        break;
+                    }
+                }
+
+                @Override public boolean hasNext() {
+                    return next != null;
+                }
+
+                @Override public Cache.Entry<?, ?> next() {
+                    if (next == null)
+                        throw new NoSuchElementException();
+
+                    cur = next;
+
+                    next = null;
+
+                    checkNext();
+
+                    return cur;
+                }
+
+                @Override public void remove() {
+                    if (cur == null)
+                        throw new IllegalStateException();
+
+                    addRemoved(cur);
+
+                    cur = null;
+                }
+            };
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean add(Cache.Entry<?, ?> entry) {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean addAll(Collection<? extends Cache.Entry<?, ?>> col) {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean remove(Object o) {
+            if (cleared || !(o instanceof Cache.Entry))
+                return false;
+
+            Cache.Entry<?, ?> e = (Cache.Entry<?, ?>)o;
+
+            if (rmvd != null && rmvd.contains(e.getKey()))
+                return false;
+
+            if (mapContains(e)) {
+                addRemoved(e);
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean containsAll(Collection<?> col) {
+            if (cleared)
+                return false;
+
+            for (Object o : col) {
+                if (contains(o))
+                    return false;
+            }
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean removeAll(Collection<?> col) {
+            if (cleared)
+                return false;
+
+            boolean modified = false;
+
+            for (Object o : col) {
+                if (remove(o))
+                    modified = true;
+            }
+
+            return modified;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean retainAll(Collection<?> col) {
+            if (cleared)
+                return false;
+
+            boolean modified = false;
+
+            for (Cache.Entry<?, ?> e : this) {
+                if (!col.contains(e)) {
+                    addRemoved(e);
+
+                    modified = true;
+                }
+            }
+
+            return modified;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void clear() {
+            cleared = true;
+        }
+
+        /**
+         * @param e Entry.
+         */
+        private void addRemoved(Cache.Entry<?, ?> e) {
+            if (rmvd == null)
+                rmvd = new HashSet<>();
+
+            rmvd.add(e.getKey());
+        }
+
+        /**
+         * @param e Entry.
+         * @return {@code True} if original map contains entry.
+         */
+        private boolean mapContains(Cache.Entry<?, ?> e) {
+            return map.containsKey(e.getKey());
+        }
+
+        /** {@inheritDoc} */
+        public String toString() {
+            Iterator<Cache.Entry<?, ?>> it = iterator();
+
+            if (!it.hasNext())
+                return "[]";
+
+            SB sb = new SB("[");
+
+            while (true) {
+                Cache.Entry<?, ?> e = it.next();
+
+                sb.a(e.toString());
+
+                if (!it.hasNext())
+                    return sb.a(']').toString();
+
+                sb.a(", ");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
new file mode 100644
index 0000000..9a35d5e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
@@ -0,0 +1,1015 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.lifecycle.*;
+import org.apache.ignite.thread.*;
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
+
+import javax.cache.integration.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+
+import static javax.cache.Cache.*;
+
+/**
+ * Internal wrapper for a {@link CacheStore} that enables write-behind logic.
+ * <p/>
+ * The general purpose of this approach is to reduce cache store load under high
+ * store update rate. The idea is to cache all write and remove operations in a pending
+ * map and delegate these changes to the underlying store either after timeout or
+ * if size of a pending map exceeded some pre-configured value. Another performance gain
+ * is achieved due to combining a group of similar operations to a single batch update.
+ * <p/>
+ * The essential flush size for the write-behind cache should be at least the estimated
+ * count of simultaneously written keys. In case of significantly smaller value there would
+ * be triggered a lot of flush events that will result in a high cache store load.
+ * <p/>
+ * Since write operations to the cache store are deferred, transaction support is lost; no
+ * transaction objects are passed to the underlying store.
+ */
+public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, LifecycleAware {
+    /** Default write cache initial capacity. */
+    public static final int DFLT_INITIAL_CAPACITY = 1024;
+
+    /** Overflow ratio for critical cache size calculation. */
+    public static final float CACHE_OVERFLOW_RATIO = 1.5f;
+
+    /** Default concurrency level of write cache. */
+    public static final int DFLT_CONCUR_LVL = 64;
+
+    /** Write cache initial capacity. */
+    private int initCap = DFLT_INITIAL_CAPACITY;
+
+    /** Concurrency level for write cache access. */
+    private int concurLvl = DFLT_CONCUR_LVL;
+
+    /** When cache size exceeds this value eldest entry will be stored to the underlying store. */
+    private int cacheMaxSize = CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_SIZE;
+
+    /** Critical cache size. If cache size exceeds this value, data flush performed synchronously. */
+    private int cacheCriticalSize;
+
+    /** Count of worker threads performing underlying store updates. */
+    private int flushThreadCnt = CacheConfiguration.DFLT_WRITE_FROM_BEHIND_FLUSH_THREAD_CNT;
+
+    /** Cache flush frequency. All pending operations will be performed in not less then this value ms. */
+    private long cacheFlushFreq = CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_FREQUENCY;
+
+    /** Maximum batch size for put and remove operations */
+    private int batchSize = CacheConfiguration.DFLT_WRITE_BEHIND_BATCH_SIZE;
+
+    /** Grid name. */
+    private String gridName;
+
+    /** Cache name. */
+    private String cacheName;
+
+    /** Underlying store. */
+    private CacheStore<K, V> store;
+
+    /** Write cache. */
+    private ConcurrentLinkedHashMap<K, StatefulValue<K, V>> writeCache;
+
+    /** Flusher threads. */
+    private GridWorker[] flushThreads;
+
+    /** Atomic flag indicating store shutdown. */
+    private AtomicBoolean stopping = new AtomicBoolean(true);
+
+    /** Flush lock. */
+    private Lock flushLock = new ReentrantLock();
+
+    /** Condition to determine records available for flush. */
+    private Condition canFlush = flushLock.newCondition();
+
+    /** Variable for counting total cache overflows. */
+    private AtomicInteger cacheTotalOverflowCntr = new AtomicInteger();
+
+    /** Variable contains current number of overflow events. */
+    private AtomicInteger cacheOverflowCntr = new AtomicInteger();
+
+    /** Variable for counting key-value pairs that are in {@link ValueStatus#RETRY} state. */
+    private AtomicInteger retryEntriesCnt = new AtomicInteger();
+
+    /** Log. */
+    private IgniteLogger log;
+
+    /** Store manager. */
+    private GridCacheOsStoreManager storeMgr;
+
+    /**
+     * Creates a write-behind cache store for the given store.
+     *
+     * @param storeMgr Store manager.
+     * @param gridName Grid name.
+     * @param cacheName Cache name.
+     * @param log Grid logger.
+     * @param store {@code GridCacheStore} that need to be wrapped.
+     */
+    public GridCacheWriteBehindStore(
+        GridCacheOsStoreManager storeMgr,
+        String gridName,
+        String cacheName,
+        IgniteLogger log,
+        CacheStore<K, V> store) {
+        this.storeMgr = storeMgr;
+        this.gridName = gridName;
+        this.cacheName = cacheName;
+        this.log = log;
+        this.store = store;
+    }
+
+    /**
+     * Sets initial capacity for the write cache.
+     *
+     * @param initCap Initial capacity.
+     */
+    public void setInitialCapacity(int initCap) {
+        this.initCap = initCap;
+    }
+
+    /**
+     * Sets concurrency level for the write cache. Concurrency level is expected count of concurrent threads
+     * attempting to update cache.
+     *
+     * @param concurLvl Concurrency level.
+     */
+    public void setConcurrencyLevel(int concurLvl) {
+        this.concurLvl = concurLvl;
+    }
+
+    /**
+     * Sets the maximum size of the write cache. When the count of unique keys in write cache exceeds this value,
+     * the eldest entry in the cache is immediately scheduled for write to the underlying store.
+     *
+     * @param cacheMaxSize Max cache size.
+     */
+    public void setFlushSize(int cacheMaxSize) {
+        this.cacheMaxSize = cacheMaxSize;
+    }
+
+    /**
+     * Gets the maximum size of the write-behind buffer. When the count of unique keys
+     * in write buffer exceeds this value, the buffer is scheduled for write to the underlying store.
+     * <p/>
+     * If this value is {@code 0}, then flush is performed only on time-elapsing basis. However,
+     * when this value is {@code 0}, the cache critical size is set to
+     * {@link CacheConfiguration#DFLT_WRITE_BEHIND_CRITICAL_SIZE}
+     *
+     * @return Buffer size that triggers flush procedure.
+     */
+    public int getWriteBehindFlushSize() {
+        return cacheMaxSize;
+    }
+
+    /**
+     * Sets the number of threads that will perform store update operations.
+     *
+     * @param flushThreadCnt Count of worker threads.
+     */
+    public void setFlushThreadCount(int flushThreadCnt) {
+        this.flushThreadCnt = flushThreadCnt;
+    }
+
+    /**
+     * Gets the number of flush threads that will perform store update operations.
+     *
+     * @return Count of worker threads.
+     */
+    public int getWriteBehindFlushThreadCount() {
+        return flushThreadCnt;
+    }
+
+    /**
+     * Sets the cache flush frequency. All pending operations on the underlying store will be performed
+     * within time interval not less then this value.
+     *
+     * @param cacheFlushFreq Time interval value in milliseconds.
+     */
+    public void setFlushFrequency(long cacheFlushFreq) {
+        this.cacheFlushFreq = cacheFlushFreq;
+    }
+
+    /**
+     * Gets the cache flush frequency. All pending operations on the underlying store will be performed
+     * within time interval not less then this value.
+     * <p/>
+     * If this value is {@code 0}, then flush is performed only when buffer size exceeds flush size.
+     *
+     * @return Flush frequency in milliseconds.
+     */
+    public long getWriteBehindFlushFrequency() {
+        return cacheFlushFreq;
+    }
+
+    /**
+     * Sets the maximum count of similar operations that can be grouped to a single batch.
+     *
+     * @param batchSize Maximum count of batch.
+     */
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    /**
+     * Gets the maximum count of similar (put or remove) operations that can be grouped to a single batch.
+     *
+     * @return Maximum size of batch.
+     */
+    public int getWriteBehindStoreBatchSize() {
+        return batchSize;
+    }
+
+    /**
+     * Gets count of entries that were processed by the write-behind store and have not been
+     * flushed to the underlying store yet.
+     *
+     * @return Total count of entries in cache store internal buffer.
+     */
+    public int getWriteBehindBufferSize() {
+        return writeCache.sizex();
+    }
+
+    /**
+     * @return Underlying store.
+     */
+    public CacheStore<K, V> store() {
+        return store;
+    }
+
+    /**
+     * Performs all the initialization logic for write-behind cache store.
+     * This class must not be used until this method returns.
+     */
+    @Override public void start() {
+        assert cacheFlushFreq != 0 || cacheMaxSize != 0;
+
+        if (stopping.compareAndSet(true, false)) {
+            if (log.isDebugEnabled())
+                log.debug("Starting write-behind store for cache '" + cacheName + '\'');
+
+            cacheCriticalSize = (int)(cacheMaxSize * CACHE_OVERFLOW_RATIO);
+
+            if (cacheCriticalSize == 0)
+                cacheCriticalSize = CacheConfiguration.DFLT_WRITE_BEHIND_CRITICAL_SIZE;
+
+            flushThreads = new GridWorker[flushThreadCnt];
+
+            writeCache = new ConcurrentLinkedHashMap<>(initCap, 0.75f, concurLvl);
+
+            for (int i = 0; i < flushThreads.length; i++) {
+                flushThreads[i] = new Flusher(gridName, "flusher-" + i, log);
+
+                new IgniteThread(flushThreads[i]).start();
+            }
+        }
+    }
+
+    /**
+     * Gets count of write buffer overflow events since initialization. Each overflow event causes
+     * the ongoing flush operation to be performed synchronously.
+     *
+     * @return Count of cache overflow events since start.
+     */
+    public int getWriteBehindTotalCriticalOverflowCount() {
+        return cacheTotalOverflowCntr.get();
+    }
+
+    /**
+     * Gets count of write buffer overflow events in progress at the moment. Each overflow event causes
+     * the ongoing flush operation to be performed synchronously.
+     *
+     * @return Count of cache overflow events since start.
+     */
+    public int getWriteBehindCriticalOverflowCount() {
+        return cacheOverflowCntr.get();
+    }
+
+    /**
+     * Gets count of cache entries that are in a store-retry state. An entry is assigned a store-retry state
+     * when underlying store failed due some reason and cache has enough space to retain this entry till
+     * the next try.
+     *
+     * @return Count of entries in store-retry state.
+     */
+    public int getWriteBehindErrorRetryCount() {
+        return retryEntriesCnt.get();
+    }
+
+    /**
+     * Performs shutdown logic for store. No put, get and remove requests will be processed after
+     * this method is called.
+     */
+    @Override public void stop() {
+        if (stopping.compareAndSet(false, true)) {
+            if (log.isDebugEnabled())
+                log.debug("Stopping write-behind store for cache '" + cacheName + '\'');
+
+            wakeUp();
+
+            boolean graceful = true;
+
+            for (GridWorker worker : flushThreads)
+                graceful &= U.join(worker, log);
+
+            if (!graceful)
+                log.warning("Shutdown was aborted");
+        }
+    }
+
+    /**
+     * Forces all entries collected to be flushed to the underlying store.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void forceFlush() throws IgniteCheckedException {
+        wakeUp();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) {
+        store.loadCache(clo, args);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<K, V> loadAll(Iterable<? extends K> keys) {
+        if (log.isDebugEnabled())
+            log.debug("Store load all [keys=" + keys + ']');
+
+        Map<K, V> loaded = new HashMap<>();
+
+        Collection<K> remaining = new LinkedList<>();
+
+        for (K key : keys) {
+            StatefulValue<K, V> val = writeCache.get(key);
+
+            if (val != null) {
+                val.readLock().lock();
+
+                try {
+                    if (val.operation() == StoreOperation.PUT)
+                        loaded.put(key, val.entry().getValue());
+                    else
+                        assert val.operation() == StoreOperation.RMV : val.operation();
+                }
+                finally {
+                    val.readLock().unlock();
+                }
+            }
+            else
+                remaining.add(key);
+        }
+
+        // For items that were not found in queue.
+        if (!remaining.isEmpty()) {
+            Map<K, V> loaded0 = store.loadAll(remaining);
+
+            if (loaded0 != null)
+                loaded.putAll(loaded0);
+        }
+
+        return loaded;
+    }
+
+    /** {@inheritDoc} */
+    @Override public V load(K key) {
+        if (log.isDebugEnabled())
+            log.debug("Store load [key=" + key + ']');
+
+        StatefulValue<K, V> val = writeCache.get(key);
+
+        if (val != null) {
+            val.readLock().lock();
+
+            try {
+                switch (val.operation()) {
+                    case PUT:
+                        return val.entry().getValue();
+
+                    case RMV:
+                        return null;
+
+                    default:
+                        assert false : "Unexpected operation: " + val.status();
+                }
+            }
+            finally {
+                val.readLock().unlock();
+            }
+        }
+
+        return store.load(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeAll(Collection<Entry<? extends K, ? extends V>> entries) {
+        for (Entry<? extends K, ? extends V> e : entries)
+            write(e);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(Entry<? extends K, ? extends V> entry) {
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Store put [key=" + entry.getKey() + ", val=" + entry.getValue() + ']');
+
+            updateCache(entry.getKey(), entry, StoreOperation.PUT);
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            throw new CacheWriterException(U.convertExceptionNoWrap(e));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void deleteAll(Collection<?> keys) {
+        for (Object key : keys)
+            delete(key);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void delete(Object key) {
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Store remove [key=" + key + ']');
+
+            updateCache((K)key, null, StoreOperation.RMV);
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            throw new CacheWriterException(U.convertExceptionNoWrap(e));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void sessionEnd(boolean commit) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheWriteBehindStore.class, this);
+    }
+
+    /**
+     * Performs flush-consistent cache update for the given key.
+     *
+     * @param key Key for which update is performed.
+     * @param val New value, may be null for remove operation.
+     * @param operation Updated value status
+     * @throws IgniteInterruptedCheckedException If interrupted while waiting for value to be flushed.
+     */
+    private void updateCache(K key,
+        @Nullable Entry<? extends K, ? extends V> val,
+        StoreOperation operation)
+        throws IgniteInterruptedCheckedException {
+        StatefulValue<K, V> newVal = new StatefulValue<>(val, operation);
+
+        StatefulValue<K, V> prev;
+
+        while ((prev = writeCache.putIfAbsent(key, newVal)) != null) {
+            prev.writeLock().lock();
+
+            try {
+                if (prev.status() == ValueStatus.PENDING) {
+                    // Flush process in progress, try again.
+                    prev.waitForFlush();
+
+                    continue;
+                }
+                else if (prev.status() == ValueStatus.FLUSHED)
+                    // This entry was deleted from map before we acquired the lock.
+                    continue;
+                else if (prev.status() == ValueStatus.RETRY)
+                    // New value has come, old value is no longer in RETRY state,
+                    retryEntriesCnt.decrementAndGet();
+
+                assert prev.status() == ValueStatus.NEW || prev.status() == ValueStatus.RETRY;
+
+                prev.update(val, operation, ValueStatus.NEW);
+
+                break;
+            }
+            finally {
+                prev.writeLock().unlock();
+            }
+        }
+
+        // Now check the map size
+        if (writeCache.sizex() > cacheCriticalSize)
+            // Perform single store update in the same thread.
+            flushSingleValue();
+        else if (cacheMaxSize > 0 && writeCache.sizex() > cacheMaxSize)
+            wakeUp();
+    }
+
+    /**
+     * Flushes one upcoming value to the underlying store. Called from
+     * {@link #updateCache(Object, Entry, StoreOperation)} method in case when current map size exceeds
+     * critical size.
+     */
+    private void flushSingleValue() {
+        cacheOverflowCntr.incrementAndGet();
+
+        try {
+            Map<K, StatefulValue<K, V>> batch = null;
+
+            for (Map.Entry<K, StatefulValue<K, V>> e : writeCache.entrySet()) {
+                StatefulValue<K, V> val = e.getValue();
+
+                val.writeLock().lock();
+
+                try {
+                    ValueStatus status = val.status();
+
+                    if (acquired(status))
+                        // Another thread is helping us, continue to the next entry.
+                        continue;
+
+                    if (val.status() == ValueStatus.RETRY)
+                        retryEntriesCnt.decrementAndGet();
+
+                    assert retryEntriesCnt.get() >= 0;
+
+                    val.status(ValueStatus.PENDING);
+
+                    batch = Collections.singletonMap(e.getKey(), val);
+                }
+                finally {
+                    val.writeLock().unlock();
+                }
+
+                if (!batch.isEmpty()) {
+                    applyBatch(batch, false);
+
+                    cacheTotalOverflowCntr.incrementAndGet();
+
+                    return;
+                }
+            }
+        }
+        finally {
+            cacheOverflowCntr.decrementAndGet();
+        }
+    }
+
+    /**
+     * Performs batch operation on underlying store.
+     *
+     * @param valMap Batch map.
+     * @param initSes {@code True} if need to initialize session.
+     */
+    private void applyBatch(Map<K, StatefulValue<K, V>> valMap, boolean initSes) {
+        assert valMap.size() <= batchSize;
+
+        StoreOperation operation = null;
+
+        // Construct a map for underlying store
+        Map<K, Entry<? extends K, ? extends V>> batch = U.newLinkedHashMap(valMap.size());
+
+        for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) {
+            if (operation == null)
+                operation = e.getValue().operation();
+
+            assert operation == e.getValue().operation();
+
+            assert e.getValue().status() == ValueStatus.PENDING;
+
+            batch.put(e.getKey(), e.getValue().entry());
+        }
+
+        if (updateStore(operation, batch, initSes)) {
+            for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) {
+                StatefulValue<K, V> val = e.getValue();
+
+                val.writeLock().lock();
+
+                try {
+                    val.status(ValueStatus.FLUSHED);
+
+                    StatefulValue<K, V> prev = writeCache.remove(e.getKey());
+
+                    // Additional check to ensure consistency.
+                    assert prev == val : "Map value for key " + e.getKey() + " was updated during flush";
+
+                    val.signalFlushed();
+                }
+                finally {
+                    val.writeLock().unlock();
+                }
+            }
+        }
+        else {
+            // Exception occurred, we must set RETRY status
+            for (StatefulValue<K, V> val : valMap.values()) {
+                val.writeLock().lock();
+
+                try {
+                    val.status(ValueStatus.RETRY);
+
+                    retryEntriesCnt.incrementAndGet();
+
+                    val.signalFlushed();
+                }
+                finally {
+                    val.writeLock().unlock();
+                }
+            }
+        }
+    }
+
+    /**
+     * Tries to update store with the given values and returns {@code true} in case of success.
+     *
+     * <p/> If any exception in underlying store is occurred, this method checks the map size.
+     * If map size exceeds some critical value, then it returns {@code true} and this value will
+     * be lost. If map size does not exceed critical value, it will return false and value will
+     * be retained in write cache.
+     *
+     * @param operation Status indicating operation that should be performed.
+     * @param vals Key-Value map.
+     * @param initSes {@code True} if need to initialize session.
+     * @return {@code true} if value may be deleted from the write cache,
+     *         {@code false} otherwise
+     */
+    private boolean updateStore(StoreOperation operation,
+        Map<K, Entry<? extends K, ? extends  V>> vals,
+        boolean initSes) {
+
+        if (initSes && storeMgr != null)
+            storeMgr.initSession(null);
+
+        try {
+            boolean threwEx = true;
+
+            try {
+                switch (operation) {
+                    case PUT:
+                        store.writeAll(vals.values());
+
+                        break;
+
+                    case RMV:
+                        store.deleteAll(vals.keySet());
+
+                        break;
+
+                    default:
+                        assert false : "Unexpected operation: " + operation;
+                }
+
+                threwEx = false;
+
+                return true;
+            }
+            finally {
+                if (initSes && storeMgr != null)
+                    storeMgr.endSession(null, threwEx);
+            }
+        }
+        catch (Exception e) {
+            LT.warn(log, e, "Unable to update underlying store: " + store);
+
+            if (writeCache.sizex() > cacheCriticalSize || stopping.get()) {
+                for (Map.Entry<K, Entry<? extends K, ? extends  V>> entry : vals.entrySet()) {
+                    Object val = entry.getValue() != null ? entry.getValue().getValue() : null;
+
+                    log.warning("Failed to update store (value will be lost as current buffer size is greater " +
+                        "than 'cacheCriticalSize' or node has been stopped before store was repaired) [key=" +
+                        entry.getKey() + ", val=" + val + ", op=" + operation + "]");
+                }
+
+                return true;
+            }
+
+            return false;
+        }
+    }
+
+    /**
+     * Wakes up flushing threads if map size exceeded maximum value or in case of shutdown.
+     */
+    private void wakeUp() {
+        flushLock.lock();
+
+        try {
+            canFlush.signalAll();
+        }
+        finally {
+            flushLock.unlock();
+        }
+    }
+
+    /**
+     * Thread that performs time-based flushing of written values to the underlying storage.
+     */
+    private class Flusher extends GridWorker {
+        /** {@inheritDoc */
+        protected Flusher(String gridName, String name, IgniteLogger log) {
+            super(gridName, name, log);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+            while (!stopping.get() || writeCache.sizex() > 0) {
+                awaitOperationsAvailable();
+
+                flushCache(writeCache.entrySet().iterator());
+            }
+        }
+
+        /**
+         * This method awaits until enough elements in map are available or given timeout is over.
+         *
+         * @throws InterruptedException If awaiting was interrupted.
+         */
+        private void awaitOperationsAvailable() throws InterruptedException {
+            flushLock.lock();
+
+            try {
+                do {
+                    if (writeCache.sizex() <= cacheMaxSize || cacheMaxSize == 0) {
+                        if (cacheFlushFreq > 0)
+                            canFlush.await(cacheFlushFreq, TimeUnit.MILLISECONDS);
+                        else
+                            canFlush.await();
+                    }
+                }
+                while (writeCache.sizex() == 0 && !stopping.get());
+            }
+            finally {
+                flushLock.unlock();
+            }
+        }
+
+        /**
+         * Removes values from the write cache and performs corresponding operation
+         * on the underlying store.
+         *
+         * @param it Iterator for write cache.
+         */
+        private void flushCache(Iterator<Map.Entry<K,StatefulValue<K, V>>> it) {
+            StoreOperation operation = null;
+
+            Map<K, StatefulValue<K, V>> batch = null;
+            Map<K, StatefulValue<K, V>> pending  = U.newLinkedHashMap(batchSize);
+
+            while (it.hasNext()) {
+                Map.Entry<K, StatefulValue<K, V>> e = it.next();
+
+                StatefulValue<K, V> val = e.getValue();
+
+                val.writeLock().lock();
+
+                try {
+                    ValueStatus status = val.status();
+
+                    if (acquired(status))
+                        // Another thread is helping us, continue to the next entry.
+                        continue;
+
+                    if (status == ValueStatus.RETRY)
+                        retryEntriesCnt.decrementAndGet();
+
+                    assert retryEntriesCnt.get() >= 0;
+
+                    val.status(ValueStatus.PENDING);
+
+                    // We scan for the next operation and apply batch on operation change. Null means new batch.
+                    if (operation == null)
+                        operation = val.operation();
+
+                    if (operation != val.operation()) {
+                        // Operation is changed, so we need to perform a batch.
+                        batch = pending;
+                        pending = U.newLinkedHashMap(batchSize);
+
+                        operation = val.operation();
+
+                        pending.put(e.getKey(), val);
+                    }
+                    else
+                        pending.put(e.getKey(), val);
+
+                    if (pending.size() == batchSize) {
+                        batch = pending;
+                        pending = U.newLinkedHashMap(batchSize);
+
+                        operation = null;
+                    }
+                }
+                finally {
+                    val.writeLock().unlock();
+                }
+
+                if (batch != null && !batch.isEmpty()) {
+                    applyBatch(batch, true);
+                    batch = null;
+                }
+            }
+
+            // Process the remainder.
+            if (!pending.isEmpty())
+                applyBatch(pending, true);
+        }
+    }
+
+    /**
+     * For test purposes only.
+     *
+     * @return Write cache for the underlying store operations.
+     */
+    Map<K, StatefulValue<K, V>> writeCache() {
+        return writeCache;
+    }
+
+    /**
+     * Enumeration that represents possible operations on the underlying store.
+     */
+    private enum StoreOperation {
+        /** Put key-value pair to the underlying store. */
+        PUT,
+
+        /** Remove key from the underlying store. */
+        RMV
+    }
+
+    /**
+     * Enumeration that represents possible states of value in the map.
+     */
+    private enum ValueStatus {
+        /** Value is scheduled for write or delete from the underlying cache but has not been captured by flusher. */
+        NEW,
+
+        /** Value is captured by flusher and store operation is performed at the moment. */
+        PENDING,
+
+        /** Store operation has failed and it will be re-tried at the next flush. */
+        RETRY,
+
+        /** Store operation succeeded and this value will be removed by flusher. */
+        FLUSHED,
+    }
+
+    /**
+     * Checks if given status indicates pending or complete flush operation.
+     *
+     * @param status Status to check.
+     * @return {@code true} if status indicates any pending or complete store update operation.
+     */
+    private boolean acquired(ValueStatus status) {
+        return status == ValueStatus.PENDING || status == ValueStatus.FLUSHED;
+    }
+
+    /**
+     * A state-value-operation trio.
+     *
+     * @param <V> Value type.
+     */
+    private static class StatefulValue<K, V> extends ReentrantReadWriteLock {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Value. */
+        @GridToStringInclude
+        private Entry<? extends K, ? extends V> val;
+
+        /** Store operation. */
+        private StoreOperation storeOperation;
+
+        /** Value status. */
+        private ValueStatus valStatus;
+
+        /** Condition to wait for flush event */
+        private Condition flushCond = writeLock().newCondition();
+
+        /**
+         * Creates a state-value pair with {@link ValueStatus#NEW} status.
+         *
+         * @param val Value.
+         * @param storeOperation Store operation.
+         */
+        private StatefulValue(Entry<? extends K, ? extends V> val, StoreOperation storeOperation) {
+            assert storeOperation == StoreOperation.PUT || storeOperation == StoreOperation.RMV;
+
+            this.val = val;
+            this.storeOperation = storeOperation;
+            valStatus = ValueStatus.NEW;
+        }
+
+        /**
+         * @return Stored value.
+         */
+        private Entry<? extends K, ? extends V> entry() {
+            return val;
+        }
+
+        /**
+         * @return Store operation.
+         */
+        private StoreOperation operation() {
+            return storeOperation;
+        }
+
+        /**
+         * @return Value status
+         */
+        private ValueStatus status() {
+            return valStatus;
+        }
+
+        /**
+         * Updates value status.
+         *
+         * @param valStatus Value status.
+         */
+        private void status(ValueStatus valStatus) {
+            this.valStatus = valStatus;
+        }
+
+        /**
+         * Updates both value and value status.
+         *
+         * @param val Value.
+         * @param storeOperation Store operation.
+         * @param valStatus Value status.
+         */
+        private void update(@Nullable Entry<? extends K, ? extends V> val,
+            StoreOperation storeOperation,
+            ValueStatus valStatus) {
+            this.val = val;
+            this.storeOperation = storeOperation;
+            this.valStatus = valStatus;
+        }
+
+        /**
+         * Awaits a signal on flush condition
+         *
+         * @throws IgniteInterruptedCheckedException If thread was interrupted.
+         */
+        private void waitForFlush() throws IgniteInterruptedCheckedException {
+            U.await(flushCond);
+        }
+
+        /**
+         * Signals flush condition.
+         */
+        @SuppressWarnings({"SignalWithoutCorrespondingAwait"})
+        private void signalFlushed() {
+            flushCond.signalAll();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (!(o instanceof StatefulValue))
+                return false;
+
+            StatefulValue other = (StatefulValue)o;
+
+            return F.eq(val, other.val) && F.eq(valStatus, other.valStatus);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = val != null ? val.hashCode() : 0;
+
+            res = 31 * res + valStatus.hashCode();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(StatefulValue.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index acd3202..4dca9e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.processors.cache.store.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.transactions.*;
 import org.apache.ignite.internal.util.*;
@@ -444,11 +445,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
      *
      * @return Store manager.
      */
-    protected GridCacheStoreManager store() {
+    protected GridCacheOsStoreManager store() {
         if (!activeCacheIds().isEmpty()) {
             int cacheId = F.first(activeCacheIds());
 
-            GridCacheStoreManager store = cctx.cacheContext(cacheId).store();
+            GridCacheOsStoreManager store = cctx.cacheContext(cacheId).store();
 
             return store.configured() ? store : null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 9d7f332..e051385 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.processors.cache.dr.*;
+import org.apache.ignite.internal.processors.cache.store.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.processors.dr.*;
 import org.apache.ignite.internal.transactions.*;
@@ -492,7 +493,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
      */
     @SuppressWarnings({"CatchGenericClass"})
     protected void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException {
-        GridCacheStoreManager store = store();
+        GridCacheOsStoreManager store = store();
 
         if (store != null && store.writeThrough() && storeEnabled() &&
             (!internal() || groupLock()) && (near() || store.writeToStoreFromDht())) {
@@ -500,7 +501,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 if (writeEntries != null) {
                     Map<Object, IgniteBiTuple<Object, GridCacheVersion>> putMap = null;
                     List<Object> rmvCol = null;
-                    GridCacheStoreManager writeStore = null;
+                    GridCacheOsStoreManager writeStore = null;
 
                     boolean skipNear = near() && store.writeToStoreFromDht();
 
@@ -981,7 +982,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
             }
         }
         else {
-            GridCacheStoreManager store = store();
+            GridCacheOsStoreManager store = store();
 
             if (store != null && (!internal() || groupLock())) {
                 try {
@@ -1086,7 +1087,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                 cctx.tm().rollbackTx(this);
 
-                GridCacheStoreManager store = store();
+                GridCacheOsStoreManager store = store();
 
                 if (store != null && (near() || store.writeToStoreFromDht())) {
                     if (!internal() || groupLock())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index ccc6aae..32e0caf 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -389,10 +389,10 @@ org.apache.ignite.internal.processors.cache.GridCacheProcessor$LocalAffinityFunc
 org.apache.ignite.internal.processors.cache.GridCacheProjectionImpl
 org.apache.ignite.internal.processors.cache.GridCacheProxyImpl
 org.apache.ignite.internal.processors.cache.GridCacheReturn
-org.apache.ignite.internal.processors.cache.GridCacheStoreManager$1
-org.apache.ignite.internal.processors.cache.GridCacheStoreManager$2
-org.apache.ignite.internal.processors.cache.GridCacheStoreManager$3
-org.apache.ignite.internal.processors.cache.GridCacheStoreManager$4
+org.apache.ignite.internal.processors.cache.store.GridCacheOsStoreManager$1
+org.apache.ignite.internal.processors.cache.store.GridCacheOsStoreManager$2
+org.apache.ignite.internal.processors.cache.store.GridCacheOsStoreManager$3
+org.apache.ignite.internal.processors.cache.store.GridCacheOsStoreManager$4
 org.apache.ignite.internal.processors.cache.GridCacheSwapManager$10
 org.apache.ignite.internal.processors.cache.GridCacheSwapManager$12
 org.apache.ignite.internal.processors.cache.GridCacheSwapManager$14
@@ -432,9 +432,9 @@ org.apache.ignite.internal.processors.cache.GridCacheUtils$8
 org.apache.ignite.internal.processors.cache.GridCacheUtils$9
 org.apache.ignite.internal.processors.cache.GridCacheValueCollection
 org.apache.ignite.internal.processors.cache.GridCacheValueCollection$1
-org.apache.ignite.internal.processors.cache.GridCacheWriteBehindStore$StatefulValue
-org.apache.ignite.internal.processors.cache.GridCacheWriteBehindStore$StoreOperation
-org.apache.ignite.internal.processors.cache.GridCacheWriteBehindStore$ValueStatus
+org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$StatefulValue
+org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$StoreOperation
+org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$ValueStatus
 org.apache.ignite.internal.processors.cache.GridPartitionLockKey
 org.apache.ignite.cache.CacheExistsException
 org.apache.ignite.internal.processors.cache.IgniteCacheProxy

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java
index b9f9602..c4ba385 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java
@@ -26,14 +26,13 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.apache.ignite.testframework.junits.common.*;
 import org.apache.ignite.transactions.*;
 
-import javax.cache.configuration.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.*;
 
 /**
  * Test that in {@link CacheMode#PARTITIONED} mode cache writes values only to the near cache store. <p/> This check
- * is needed because in current implementation if {@link GridCacheWriteBehindStore} assumes that and user store is
+ * is needed because in current implementation if {@link org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore} assumes that and user store is
  * wrapped only in near cache (see {@link GridCacheProcessor} init logic).
  */
 @SuppressWarnings({"unchecked"})

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractSelfTest.java
deleted file mode 100644
index 883a216..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractSelfTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Harness for {@link GridCacheWriteBehindStore} tests.
- */
-public abstract class GridCacheWriteBehindStoreAbstractSelfTest extends GridCommonAbstractTest {
-    /** Write cache size. */
-    public static final int CACHE_SIZE = 1024;
-
-    /** Value dump interval. */
-    public static final int FLUSH_FREQUENCY = 1000;
-
-    /** Underlying store. */
-    protected GridCacheTestStore delegate = new GridCacheTestStore();
-
-    /** Tested store. */
-    protected GridCacheWriteBehindStore<Integer, String> store;
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        delegate = null;
-        store = null;
-
-        super.afterTestsStopped();
-    }
-
-    /**
-     * Initializes store.
-     *
-     * @param flushThreadCnt Count of flush threads
-     * @throws Exception If failed.
-     */
-    protected void initStore(int flushThreadCnt) throws Exception {
-        store = new GridCacheWriteBehindStore<>(null, "", "", log, delegate);
-
-        store.setFlushFrequency(FLUSH_FREQUENCY);
-
-        store.setFlushSize(CACHE_SIZE);
-
-        store.setFlushThreadCount(flushThreadCnt);
-
-        delegate.reset();
-
-        store.start();
-    }
-
-    /**
-     * Shutdowns store.
-     *
-     * @throws Exception If failed.
-     */
-    protected void shutdownStore() throws Exception {
-        store.stop();
-
-        assertTrue("Store cache must be empty after shutdown", store.writeCache().isEmpty());
-    }
-
-    /**
-     * Performs multiple put, get and remove operations in several threads on a store. After
-     * all threads finished their operations, returns the total set of keys that should be
-     * in underlying store.
-     *
-     * @param threadCnt Count of threads that should update keys.
-     * @param keysPerThread Count of unique keys assigned to a thread.
-     * @return Set of keys that was totally put in store.
-     * @throws Exception If failed.
-     */
-    protected Set<Integer> runPutGetRemoveMultithreaded(int threadCnt, final int keysPerThread) throws Exception {
-        final ConcurrentMap<String, Set<Integer>> perThread = new ConcurrentHashMap<>();
-
-        final AtomicBoolean running = new AtomicBoolean(true);
-
-        final AtomicInteger cntr = new AtomicInteger();
-
-        final AtomicInteger operations = new AtomicInteger();
-
-        IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
-            @SuppressWarnings({"NullableProblems"})
-            @Override public void run() {
-                // Initialize key set for this thread.
-                Set<Integer> set = new HashSet<>();
-
-                Set<Integer> old = perThread.putIfAbsent(Thread.currentThread().getName(), set);
-
-                if (old != null)
-                    set = old;
-
-                List<Integer> original = new ArrayList<>();
-
-                Random rnd = new Random();
-
-                for (int i = 0; i < keysPerThread; i++)
-                    original.add(cntr.getAndIncrement());
-
-                try {
-                    while (running.get()) {
-                        int op = rnd.nextInt(3);
-                        int idx = rnd.nextInt(keysPerThread);
-
-                        int key = original.get(idx);
-
-                        switch (op) {
-                            case 0:
-                                store.write(new CacheEntryImpl<>(key, "val" + key));
-                                set.add(key);
-
-                                operations.incrementAndGet();
-
-                                break;
-
-                            case 1:
-                                store.delete(key);
-                                set.remove(key);
-
-                                operations.incrementAndGet();
-
-                                break;
-
-                            case 2:
-                            default:
-                                store.write(new CacheEntryImpl<>(key, "broken"));
-
-                                String val = store.load(key);
-
-                                assertEquals("Invalid intermediate value: " + val, "broken", val);
-
-                                store.write(new CacheEntryImpl<>(key, "val" + key));
-
-                                set.add(key);
-
-                                // 2 put operations performed here.
-                                operations.incrementAndGet();
-                                operations.incrementAndGet();
-                                operations.incrementAndGet();
-
-                                break;
-                        }
-                    }
-                }
-                catch (Exception e) {
-                    error("Unexpected exception in put thread", e);
-
-                    assert false;
-                }
-            }
-        }, threadCnt, "put");
-
-        U.sleep(10000);
-
-        running.set(false);
-
-        fut.get();
-
-        log().info(">>> " + operations + " operations performed totally");
-
-        Set<Integer> total = new HashSet<>();
-
-        for (Set<Integer> threadVals : perThread.values()) {
-            total.addAll(threadVals);
-        }
-
-        return total;
-    }
-}


Mime
View raw message