ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [26/37] incubator-ignite git commit: # GG-9973: Fixed.
Date Fri, 03 Apr 2015 09:13:23 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStore.java
deleted file mode 100644
index c26c34e..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStore.java
+++ /dev/null
@@ -1,1015 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.store.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.lifecycle.*;
-import org.apache.ignite.thread.*;
-import org.jetbrains.annotations.*;
-import org.jsr166.*;
-
-import javax.cache.integration.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
-
-import static javax.cache.Cache.*;
-
-/**
- * Internal wrapper for a {@link CacheStore} that enables write-behind logic.
- * <p/>
- * The general purpose of this approach is to reduce cache store load under high
- * store update rate. The idea is to cache all write and remove operations in a pending
- * map and delegate these changes to the underlying store either after timeout or
- * if size of a pending map exceeded some pre-configured value. Another performance gain
- * is achieved due to combining a group of similar operations to a single batch update.
- * <p/>
- * The essential flush size for the write-behind cache should be at least the estimated
- * count of simultaneously written keys. In case of significantly smaller value there would
- * be triggered a lot of flush events that will result in a high cache store load.
- * <p/>
- * Since write operations to the cache store are deferred, transaction support is lost; no
- * transaction objects are passed to the underlying store.
- */
-public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, LifecycleAware {
-    /** Default write cache initial capacity. */
-    public static final int DFLT_INITIAL_CAPACITY = 1024;
-
-    /** Overflow ratio for critical cache size calculation. */
-    public static final float CACHE_OVERFLOW_RATIO = 1.5f;
-
-    /** Default concurrency level of write cache. */
-    public static final int DFLT_CONCUR_LVL = 64;
-
-    /** Write cache initial capacity. */
-    private int initCap = DFLT_INITIAL_CAPACITY;
-
-    /** Concurrency level for write cache access. */
-    private int concurLvl = DFLT_CONCUR_LVL;
-
-    /** When cache size exceeds this value eldest entry will be stored to the underlying store. */
-    private int cacheMaxSize = CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_SIZE;
-
-    /** Critical cache size. If cache size exceeds this value, data flush performed synchronously. */
-    private int cacheCriticalSize;
-
-    /** Count of worker threads performing underlying store updates. */
-    private int flushThreadCnt = CacheConfiguration.DFLT_WRITE_FROM_BEHIND_FLUSH_THREAD_CNT;
-
-    /** Cache flush frequency. All pending operations will be performed in not less then this value ms. */
-    private long cacheFlushFreq = CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_FREQUENCY;
-
-    /** Maximum batch size for put and remove operations */
-    private int batchSize = CacheConfiguration.DFLT_WRITE_BEHIND_BATCH_SIZE;
-
-    /** Grid name. */
-    private String gridName;
-
-    /** Cache name. */
-    private String cacheName;
-
-    /** Underlying store. */
-    private CacheStore<K, V> store;
-
-    /** Write cache. */
-    private ConcurrentLinkedHashMap<K, StatefulValue<K, V>> writeCache;
-
-    /** Flusher threads. */
-    private GridWorker[] flushThreads;
-
-    /** Atomic flag indicating store shutdown. */
-    private AtomicBoolean stopping = new AtomicBoolean(true);
-
-    /** Flush lock. */
-    private Lock flushLock = new ReentrantLock();
-
-    /** Condition to determine records available for flush. */
-    private Condition canFlush = flushLock.newCondition();
-
-    /** Variable for counting total cache overflows. */
-    private AtomicInteger cacheTotalOverflowCntr = new AtomicInteger();
-
-    /** Variable contains current number of overflow events. */
-    private AtomicInteger cacheOverflowCntr = new AtomicInteger();
-
-    /** Variable for counting key-value pairs that are in {@link ValueStatus#RETRY} state. */
-    private AtomicInteger retryEntriesCnt = new AtomicInteger();
-
-    /** Log. */
-    private IgniteLogger log;
-
-    /** Store manager. */
-    private GridCacheStoreManager storeMgr;
-
-    /**
-     * Creates a write-behind cache store for the given store.
-     *
-     * @param storeMgr Store manager.
-     * @param gridName Grid name.
-     * @param cacheName Cache name.
-     * @param log Grid logger.
-     * @param store {@code GridCacheStore} that need to be wrapped.
-     */
-    public GridCacheWriteBehindStore(
-        GridCacheStoreManager storeMgr,
-        String gridName,
-        String cacheName,
-        IgniteLogger log,
-        CacheStore<K, V> store) {
-        this.storeMgr = storeMgr;
-        this.gridName = gridName;
-        this.cacheName = cacheName;
-        this.log = log;
-        this.store = store;
-    }
-
-    /**
-     * Sets initial capacity for the write cache.
-     *
-     * @param initCap Initial capacity.
-     */
-    public void setInitialCapacity(int initCap) {
-        this.initCap = initCap;
-    }
-
-    /**
-     * Sets concurrency level for the write cache. Concurrency level is expected count of concurrent threads
-     * attempting to update cache.
-     *
-     * @param concurLvl Concurrency level.
-     */
-    public void setConcurrencyLevel(int concurLvl) {
-        this.concurLvl = concurLvl;
-    }
-
-    /**
-     * Sets the maximum size of the write cache. When the count of unique keys in write cache exceeds this value,
-     * the eldest entry in the cache is immediately scheduled for write to the underlying store.
-     *
-     * @param cacheMaxSize Max cache size.
-     */
-    public void setFlushSize(int cacheMaxSize) {
-        this.cacheMaxSize = cacheMaxSize;
-    }
-
-    /**
-     * Gets the maximum size of the write-behind buffer. When the count of unique keys
-     * in write buffer exceeds this value, the buffer is scheduled for write to the underlying store.
-     * <p/>
-     * If this value is {@code 0}, then flush is performed only on time-elapsing basis. However,
-     * when this value is {@code 0}, the cache critical size is set to
-     * {@link CacheConfiguration#DFLT_WRITE_BEHIND_CRITICAL_SIZE}
-     *
-     * @return Buffer size that triggers flush procedure.
-     */
-    public int getWriteBehindFlushSize() {
-        return cacheMaxSize;
-    }
-
-    /**
-     * Sets the number of threads that will perform store update operations.
-     *
-     * @param flushThreadCnt Count of worker threads.
-     */
-    public void setFlushThreadCount(int flushThreadCnt) {
-        this.flushThreadCnt = flushThreadCnt;
-    }
-
-    /**
-     * Gets the number of flush threads that will perform store update operations.
-     *
-     * @return Count of worker threads.
-     */
-    public int getWriteBehindFlushThreadCount() {
-        return flushThreadCnt;
-    }
-
-    /**
-     * Sets the cache flush frequency. All pending operations on the underlying store will be performed
-     * within time interval not less then this value.
-     *
-     * @param cacheFlushFreq Time interval value in milliseconds.
-     */
-    public void setFlushFrequency(long cacheFlushFreq) {
-        this.cacheFlushFreq = cacheFlushFreq;
-    }
-
-    /**
-     * Gets the cache flush frequency. All pending operations on the underlying store will be performed
-     * within time interval not less then this value.
-     * <p/>
-     * If this value is {@code 0}, then flush is performed only when buffer size exceeds flush size.
-     *
-     * @return Flush frequency in milliseconds.
-     */
-    public long getWriteBehindFlushFrequency() {
-        return cacheFlushFreq;
-    }
-
-    /**
-     * Sets the maximum count of similar operations that can be grouped to a single batch.
-     *
-     * @param batchSize Maximum count of batch.
-     */
-    public void setBatchSize(int batchSize) {
-        this.batchSize = batchSize;
-    }
-
-    /**
-     * Gets the maximum count of similar (put or remove) operations that can be grouped to a single batch.
-     *
-     * @return Maximum size of batch.
-     */
-    public int getWriteBehindStoreBatchSize() {
-        return batchSize;
-    }
-
-    /**
-     * Gets count of entries that were processed by the write-behind store and have not been
-     * flushed to the underlying store yet.
-     *
-     * @return Total count of entries in cache store internal buffer.
-     */
-    public int getWriteBehindBufferSize() {
-        return writeCache.sizex();
-    }
-
-    /**
-     * @return Underlying store.
-     */
-    public CacheStore<K, V> store() {
-        return store;
-    }
-
-    /**
-     * Performs all the initialization logic for write-behind cache store.
-     * This class must not be used until this method returns.
-     */
-    @Override public void start() {
-        assert cacheFlushFreq != 0 || cacheMaxSize != 0;
-
-        if (stopping.compareAndSet(true, false)) {
-            if (log.isDebugEnabled())
-                log.debug("Starting write-behind store for cache '" + cacheName + '\'');
-
-            cacheCriticalSize = (int)(cacheMaxSize * CACHE_OVERFLOW_RATIO);
-
-            if (cacheCriticalSize == 0)
-                cacheCriticalSize = CacheConfiguration.DFLT_WRITE_BEHIND_CRITICAL_SIZE;
-
-            flushThreads = new GridWorker[flushThreadCnt];
-
-            writeCache = new ConcurrentLinkedHashMap<>(initCap, 0.75f, concurLvl);
-
-            for (int i = 0; i < flushThreads.length; i++) {
-                flushThreads[i] = new Flusher(gridName, "flusher-" + i, log);
-
-                new IgniteThread(flushThreads[i]).start();
-            }
-        }
-    }
-
-    /**
-     * Gets count of write buffer overflow events since initialization. Each overflow event causes
-     * the ongoing flush operation to be performed synchronously.
-     *
-     * @return Count of cache overflow events since start.
-     */
-    public int getWriteBehindTotalCriticalOverflowCount() {
-        return cacheTotalOverflowCntr.get();
-    }
-
-    /**
-     * Gets count of write buffer overflow events in progress at the moment. Each overflow event causes
-     * the ongoing flush operation to be performed synchronously.
-     *
-     * @return Count of cache overflow events since start.
-     */
-    public int getWriteBehindCriticalOverflowCount() {
-        return cacheOverflowCntr.get();
-    }
-
-    /**
-     * Gets count of cache entries that are in a store-retry state. An entry is assigned a store-retry state
-     * when underlying store failed due some reason and cache has enough space to retain this entry till
-     * the next try.
-     *
-     * @return Count of entries in store-retry state.
-     */
-    public int getWriteBehindErrorRetryCount() {
-        return retryEntriesCnt.get();
-    }
-
-    /**
-     * Performs shutdown logic for store. No put, get and remove requests will be processed after
-     * this method is called.
-     */
-    @Override public void stop() {
-        if (stopping.compareAndSet(false, true)) {
-            if (log.isDebugEnabled())
-                log.debug("Stopping write-behind store for cache '" + cacheName + '\'');
-
-            wakeUp();
-
-            boolean graceful = true;
-
-            for (GridWorker worker : flushThreads)
-                graceful &= U.join(worker, log);
-
-            if (!graceful)
-                log.warning("Shutdown was aborted");
-        }
-    }
-
-    /**
-     * Forces all entries collected to be flushed to the underlying store.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void forceFlush() throws IgniteCheckedException {
-        wakeUp();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) {
-        store.loadCache(clo, args);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Map<K, V> loadAll(Iterable<? extends K> keys) {
-        if (log.isDebugEnabled())
-            log.debug("Store load all [keys=" + keys + ']');
-
-        Map<K, V> loaded = new HashMap<>();
-
-        Collection<K> remaining = new LinkedList<>();
-
-        for (K key : keys) {
-            StatefulValue<K, V> val = writeCache.get(key);
-
-            if (val != null) {
-                val.readLock().lock();
-
-                try {
-                    if (val.operation() == StoreOperation.PUT)
-                        loaded.put(key, val.entry().getValue());
-                    else
-                        assert val.operation() == StoreOperation.RMV : val.operation();
-                }
-                finally {
-                    val.readLock().unlock();
-                }
-            }
-            else
-                remaining.add(key);
-        }
-
-        // For items that were not found in queue.
-        if (!remaining.isEmpty()) {
-            Map<K, V> loaded0 = store.loadAll(remaining);
-
-            if (loaded0 != null)
-                loaded.putAll(loaded0);
-        }
-
-        return loaded;
-    }
-
-    /** {@inheritDoc} */
-    @Override public V load(K key) {
-        if (log.isDebugEnabled())
-            log.debug("Store load [key=" + key + ']');
-
-        StatefulValue<K, V> val = writeCache.get(key);
-
-        if (val != null) {
-            val.readLock().lock();
-
-            try {
-                switch (val.operation()) {
-                    case PUT:
-                        return val.entry().getValue();
-
-                    case RMV:
-                        return null;
-
-                    default:
-                        assert false : "Unexpected operation: " + val.status();
-                }
-            }
-            finally {
-                val.readLock().unlock();
-            }
-        }
-
-        return store.load(key);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeAll(Collection<Entry<? extends K, ? extends V>> entries) {
-        for (Entry<? extends K, ? extends V> e : entries)
-            write(e);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void write(Entry<? extends K, ? extends V> entry) {
-        try {
-            if (log.isDebugEnabled())
-                log.debug("Store put [key=" + entry.getKey() + ", val=" + entry.getValue() + ']');
-
-            updateCache(entry.getKey(), entry, StoreOperation.PUT);
-        }
-        catch (IgniteInterruptedCheckedException e) {
-            throw new CacheWriterException(U.convertExceptionNoWrap(e));
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void deleteAll(Collection<?> keys) {
-        for (Object key : keys)
-            delete(key);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void delete(Object key) {
-        try {
-            if (log.isDebugEnabled())
-                log.debug("Store remove [key=" + key + ']');
-
-            updateCache((K)key, null, StoreOperation.RMV);
-        }
-        catch (IgniteInterruptedCheckedException e) {
-            throw new CacheWriterException(U.convertExceptionNoWrap(e));
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void sessionEnd(boolean commit) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheWriteBehindStore.class, this);
-    }
-
-    /**
-     * Performs flush-consistent cache update for the given key.
-     *
-     * @param key Key for which update is performed.
-     * @param val New value, may be null for remove operation.
-     * @param operation Updated value status
-     * @throws IgniteInterruptedCheckedException If interrupted while waiting for value to be flushed.
-     */
-    private void updateCache(K key,
-        @Nullable Entry<? extends K, ? extends V> val,
-        StoreOperation operation)
-        throws IgniteInterruptedCheckedException {
-        StatefulValue<K, V> newVal = new StatefulValue<>(val, operation);
-
-        StatefulValue<K, V> prev;
-
-        while ((prev = writeCache.putIfAbsent(key, newVal)) != null) {
-            prev.writeLock().lock();
-
-            try {
-                if (prev.status() == ValueStatus.PENDING) {
-                    // Flush process in progress, try again.
-                    prev.waitForFlush();
-
-                    continue;
-                }
-                else if (prev.status() == ValueStatus.FLUSHED)
-                    // This entry was deleted from map before we acquired the lock.
-                    continue;
-                else if (prev.status() == ValueStatus.RETRY)
-                    // New value has come, old value is no longer in RETRY state,
-                    retryEntriesCnt.decrementAndGet();
-
-                assert prev.status() == ValueStatus.NEW || prev.status() == ValueStatus.RETRY;
-
-                prev.update(val, operation, ValueStatus.NEW);
-
-                break;
-            }
-            finally {
-                prev.writeLock().unlock();
-            }
-        }
-
-        // Now check the map size
-        if (writeCache.sizex() > cacheCriticalSize)
-            // Perform single store update in the same thread.
-            flushSingleValue();
-        else if (cacheMaxSize > 0 && writeCache.sizex() > cacheMaxSize)
-            wakeUp();
-    }
-
-    /**
-     * Flushes one upcoming value to the underlying store. Called from
-     * {@link #updateCache(Object, Entry, StoreOperation)} method in case when current map size exceeds
-     * critical size.
-     */
-    private void flushSingleValue() {
-        cacheOverflowCntr.incrementAndGet();
-
-        try {
-            Map<K, StatefulValue<K, V>> batch = null;
-
-            for (Map.Entry<K, StatefulValue<K, V>> e : writeCache.entrySet()) {
-                StatefulValue<K, V> val = e.getValue();
-
-                val.writeLock().lock();
-
-                try {
-                    ValueStatus status = val.status();
-
-                    if (acquired(status))
-                        // Another thread is helping us, continue to the next entry.
-                        continue;
-
-                    if (val.status() == ValueStatus.RETRY)
-                        retryEntriesCnt.decrementAndGet();
-
-                    assert retryEntriesCnt.get() >= 0;
-
-                    val.status(ValueStatus.PENDING);
-
-                    batch = Collections.singletonMap(e.getKey(), val);
-                }
-                finally {
-                    val.writeLock().unlock();
-                }
-
-                if (!batch.isEmpty()) {
-                    applyBatch(batch, false);
-
-                    cacheTotalOverflowCntr.incrementAndGet();
-
-                    return;
-                }
-            }
-        }
-        finally {
-            cacheOverflowCntr.decrementAndGet();
-        }
-    }
-
-    /**
-     * Performs batch operation on underlying store.
-     *
-     * @param valMap Batch map.
-     * @param initSes {@code True} if need to initialize session.
-     */
-    private void applyBatch(Map<K, StatefulValue<K, V>> valMap, boolean initSes) {
-        assert valMap.size() <= batchSize;
-
-        StoreOperation operation = null;
-
-        // Construct a map for underlying store
-        Map<K, Entry<? extends K, ? extends V>> batch = U.newLinkedHashMap(valMap.size());
-
-        for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) {
-            if (operation == null)
-                operation = e.getValue().operation();
-
-            assert operation == e.getValue().operation();
-
-            assert e.getValue().status() == ValueStatus.PENDING;
-
-            batch.put(e.getKey(), e.getValue().entry());
-        }
-
-        if (updateStore(operation, batch, initSes)) {
-            for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) {
-                StatefulValue<K, V> val = e.getValue();
-
-                val.writeLock().lock();
-
-                try {
-                    val.status(ValueStatus.FLUSHED);
-
-                    StatefulValue<K, V> prev = writeCache.remove(e.getKey());
-
-                    // Additional check to ensure consistency.
-                    assert prev == val : "Map value for key " + e.getKey() + " was updated during flush";
-
-                    val.signalFlushed();
-                }
-                finally {
-                    val.writeLock().unlock();
-                }
-            }
-        }
-        else {
-            // Exception occurred, we must set RETRY status
-            for (StatefulValue<K, V> val : valMap.values()) {
-                val.writeLock().lock();
-
-                try {
-                    val.status(ValueStatus.RETRY);
-
-                    retryEntriesCnt.incrementAndGet();
-
-                    val.signalFlushed();
-                }
-                finally {
-                    val.writeLock().unlock();
-                }
-            }
-        }
-    }
-
-    /**
-     * Tries to update store with the given values and returns {@code true} in case of success.
-     *
-     * <p/> If any exception in underlying store is occurred, this method checks the map size.
-     * If map size exceeds some critical value, then it returns {@code true} and this value will
-     * be lost. If map size does not exceed critical value, it will return false and value will
-     * be retained in write cache.
-     *
-     * @param operation Status indicating operation that should be performed.
-     * @param vals Key-Value map.
-     * @param initSes {@code True} if need to initialize session.
-     * @return {@code true} if value may be deleted from the write cache,
-     *         {@code false} otherwise
-     */
-    private boolean updateStore(StoreOperation operation,
-        Map<K, Entry<? extends K, ? extends  V>> vals,
-        boolean initSes) {
-
-        if (initSes && storeMgr != null)
-            storeMgr.initSession(null);
-
-        try {
-            boolean threwEx = true;
-
-            try {
-                switch (operation) {
-                    case PUT:
-                        store.writeAll(vals.values());
-
-                        break;
-
-                    case RMV:
-                        store.deleteAll(vals.keySet());
-
-                        break;
-
-                    default:
-                        assert false : "Unexpected operation: " + operation;
-                }
-
-                threwEx = false;
-
-                return true;
-            }
-            finally {
-                if (initSes && storeMgr != null)
-                    storeMgr.endSession(null, threwEx);
-            }
-        }
-        catch (Exception e) {
-            LT.warn(log, e, "Unable to update underlying store: " + store);
-
-            if (writeCache.sizex() > cacheCriticalSize || stopping.get()) {
-                for (Map.Entry<K, Entry<? extends K, ? extends  V>> entry : vals.entrySet()) {
-                    Object val = entry.getValue() != null ? entry.getValue().getValue() : null;
-
-                    log.warning("Failed to update store (value will be lost as current buffer size is greater " +
-                        "than 'cacheCriticalSize' or node has been stopped before store was repaired) [key=" +
-                        entry.getKey() + ", val=" + val + ", op=" + operation + "]");
-                }
-
-                return true;
-            }
-
-            return false;
-        }
-    }
-
-    /**
-     * Wakes up flushing threads if map size exceeded maximum value or in case of shutdown.
-     */
-    private void wakeUp() {
-        flushLock.lock();
-
-        try {
-            canFlush.signalAll();
-        }
-        finally {
-            flushLock.unlock();
-        }
-    }
-
-    /**
-     * Thread that performs time-based flushing of written values to the underlying storage.
-     */
-    private class Flusher extends GridWorker {
-        /** {@inheritDoc */
-        protected Flusher(String gridName, String name, IgniteLogger log) {
-            super(gridName, name, log);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-            while (!stopping.get() || writeCache.sizex() > 0) {
-                awaitOperationsAvailable();
-
-                flushCache(writeCache.entrySet().iterator());
-            }
-        }
-
-        /**
-         * This method awaits until enough elements in map are available or given timeout is over.
-         *
-         * @throws InterruptedException If awaiting was interrupted.
-         */
-        private void awaitOperationsAvailable() throws InterruptedException {
-            flushLock.lock();
-
-            try {
-                do {
-                    if (writeCache.sizex() <= cacheMaxSize || cacheMaxSize == 0) {
-                        if (cacheFlushFreq > 0)
-                            canFlush.await(cacheFlushFreq, TimeUnit.MILLISECONDS);
-                        else
-                            canFlush.await();
-                    }
-                }
-                while (writeCache.sizex() == 0 && !stopping.get());
-            }
-            finally {
-                flushLock.unlock();
-            }
-        }
-
-        /**
-         * Removes values from the write cache and performs corresponding operation
-         * on the underlying store.
-         *
-         * @param it Iterator for write cache.
-         */
-        private void flushCache(Iterator<Map.Entry<K,StatefulValue<K, V>>> it) {
-            StoreOperation operation = null;
-
-            Map<K, StatefulValue<K, V>> batch = null;
-            Map<K, StatefulValue<K, V>> pending  = U.newLinkedHashMap(batchSize);
-
-            while (it.hasNext()) {
-                Map.Entry<K, StatefulValue<K, V>> e = it.next();
-
-                StatefulValue<K, V> val = e.getValue();
-
-                val.writeLock().lock();
-
-                try {
-                    ValueStatus status = val.status();
-
-                    if (acquired(status))
-                        // Another thread is helping us, continue to the next entry.
-                        continue;
-
-                    if (status == ValueStatus.RETRY)
-                        retryEntriesCnt.decrementAndGet();
-
-                    assert retryEntriesCnt.get() >= 0;
-
-                    val.status(ValueStatus.PENDING);
-
-                    // We scan for the next operation and apply batch on operation change. Null means new batch.
-                    if (operation == null)
-                        operation = val.operation();
-
-                    if (operation != val.operation()) {
-                        // Operation is changed, so we need to perform a batch.
-                        batch = pending;
-                        pending = U.newLinkedHashMap(batchSize);
-
-                        operation = val.operation();
-
-                        pending.put(e.getKey(), val);
-                    }
-                    else
-                        pending.put(e.getKey(), val);
-
-                    if (pending.size() == batchSize) {
-                        batch = pending;
-                        pending = U.newLinkedHashMap(batchSize);
-
-                        operation = null;
-                    }
-                }
-                finally {
-                    val.writeLock().unlock();
-                }
-
-                if (batch != null && !batch.isEmpty()) {
-                    applyBatch(batch, true);
-                    batch = null;
-                }
-            }
-
-            // Process the remainder.
-            if (!pending.isEmpty())
-                applyBatch(pending, true);
-        }
-    }
-
-    /**
-     * For test purposes only.
-     *
-     * @return Write cache for the underlying store operations.
-     */
-    Map<K, StatefulValue<K, V>> writeCache() {
-        return writeCache;
-    }
-
-    /**
-     * Enumeration that represents possible operations on the underlying store.
-     */
-    private enum StoreOperation {
-        /** Put key-value pair to the underlying store. */
-        PUT,
-
-        /** Remove key from the underlying store. */
-        RMV
-    }
-
-    /**
-     * Enumeration that represents possible states of value in the map.
-     */
-    private enum ValueStatus {
-        /** Value is scheduled for write or delete from the underlying cache but has not been captured by flusher. */
-        NEW,
-
-        /** Value is captured by flusher and store operation is performed at the moment. */
-        PENDING,
-
-        /** Store operation has failed and it will be re-tried at the next flush. */
-        RETRY,
-
-        /** Store operation succeeded and this value will be removed by flusher. */
-        FLUSHED,
-    }
-
-    /**
-     * Checks if given status indicates pending or complete flush operation.
-     *
-     * @param status Status to check.
-     * @return {@code true} if status indicates any pending or complete store update operation.
-     */
-    private boolean acquired(ValueStatus status) {
-        return status == ValueStatus.PENDING || status == ValueStatus.FLUSHED;
-    }
-
-    /**
-     * A state-value-operation trio.
-     *
-     * @param <V> Value type.
-     */
-    private static class StatefulValue<K, V> extends ReentrantReadWriteLock {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Value. */
-        @GridToStringInclude
-        private Entry<? extends K, ? extends V> val;
-
-        /** Store operation. */
-        private StoreOperation storeOperation;
-
-        /** Value status. */
-        private ValueStatus valStatus;
-
-        /** Condition to wait for flush event */
-        private Condition flushCond = writeLock().newCondition();
-
-        /**
-         * Creates a state-value pair with {@link ValueStatus#NEW} status.
-         *
-         * @param val Value.
-         * @param storeOperation Store operation.
-         */
-        private StatefulValue(Entry<? extends K, ? extends V> val, StoreOperation storeOperation) {
-            assert storeOperation == StoreOperation.PUT || storeOperation == StoreOperation.RMV;
-
-            this.val = val;
-            this.storeOperation = storeOperation;
-            valStatus = ValueStatus.NEW;
-        }
-
-        /**
-         * @return Stored value.
-         */
-        private Entry<? extends K, ? extends V> entry() {
-            return val;
-        }
-
-        /**
-         * @return Store operation.
-         */
-        private StoreOperation operation() {
-            return storeOperation;
-        }
-
-        /**
-         * @return Value status
-         */
-        private ValueStatus status() {
-            return valStatus;
-        }
-
-        /**
-         * Updates value status.
-         *
-         * @param valStatus Value status.
-         */
-        private void status(ValueStatus valStatus) {
-            this.valStatus = valStatus;
-        }
-
-        /**
-         * Updates both value and value status.
-         *
-         * @param val Value.
-         * @param storeOperation Store operation.
-         * @param valStatus Value status.
-         */
-        private void update(@Nullable Entry<? extends K, ? extends V> val,
-            StoreOperation storeOperation,
-            ValueStatus valStatus) {
-            this.val = val;
-            this.storeOperation = storeOperation;
-            this.valStatus = valStatus;
-        }
-
-        /**
-         * Awaits a signal on flush condition
-         *
-         * @throws IgniteInterruptedCheckedException If thread was interrupted.
-         */
-        private void waitForFlush() throws IgniteInterruptedCheckedException {
-            U.await(flushCond);
-        }
-
-        /**
-         * Signals flush condition.
-         */
-        @SuppressWarnings({"SignalWithoutCorrespondingAwait"})
-        private void signalFlushed() {
-            flushCond.signalAll();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (!(o instanceof StatefulValue))
-                return false;
-
-            StatefulValue other = (StatefulValue)o;
-
-            return F.eq(val, other.val) && F.eq(valStatus, other.valStatus);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = val != null ? val.hashCode() : 0;
-
-            res = 31 * res + valStatus.hashCode();
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(StatefulValue.class, this);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 6c27566..f2dd0c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -360,7 +360,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     /** {@inheritDoc} */
     @Override public void localLoad(Collection<? extends K> keys, final ExpiryPolicy plc)
         throws IgniteCheckedException {
-        if (ctx.store().isLocalStore()) {
+        if (ctx.store().isLocal()) {
             super.localLoad(keys, plc);
 
             return;
@@ -377,7 +377,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
         Collection<KeyCacheObject> keys0 = ctx.cacheKeysView(keys);
 
-        ctx.store().loadAllFromStore(null, keys0, new CI2<KeyCacheObject, Object>() {
+        ctx.store().loadAll(null, keys0, new CI2<KeyCacheObject, Object>() {
             @Override public void apply(KeyCacheObject key, Object val) {
                 loadEntry(key, val, ver0, null, topVer, replicate, plc0);
             }
@@ -386,7 +386,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
     /** {@inheritDoc} */
     @Override public void localLoadCache(final IgniteBiPredicate<K, V> p, Object[] args) throws IgniteCheckedException {
-        if (ctx.store().isLocalStore()) {
+        if (ctx.store().isLocal()) {
             super.localLoadCache(p, args);
 
             return;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 7195d1c..e6d5173 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -564,8 +564,8 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
                         log.debug("Entry has been cleared from swap storage: " + this);
                 }
 
-                if (cctx.store().isLocalStore())
-                    cctx.store().removeFromStore(null, keyValue(false));
+                if (cctx.store().isLocal())
+                    cctx.store().remove(null, keyValue(false));
 
                 rmv = true;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 4e72bd1..c433698 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -494,7 +494,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>
         try {
             GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> it = cctx.swap().iterator(id);
 
-            boolean isLocStore = cctx.store().isLocalStore();
+            boolean isLocStore = cctx.store().isLocal();
 
             if (it != null) {
                 // We can safely remove these values because no entries will be created for evicted partition.
@@ -508,7 +508,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>
                     cctx.swap().remove(key);
 
                     if (isLocStore)
-                        cctx.store().removeFromStore(null, key.value(cctx.cacheObjectContext(), false));
+                        cctx.store().remove(null, key.value(cctx.cacheObjectContext(), false));
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 99fb724..7e93946 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -945,7 +945,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
             }
 
             try {
-                cctx.store().loadAllFromStore(
+                cctx.store().loadAll(
                     null,
                     loadMap.keySet(),
                     new CI2<KeyCacheObject, Object>() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index c376fa0..5e8a1f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1135,7 +1135,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         if (keys.size() > 1 &&                             // Several keys ...
                             writeThrough() &&                              // and store is enabled ...
-                            !ctx.store().isLocalStore() &&                 // and this is not local store ...
+                            !ctx.store().isLocal() &&                 // and this is not local store ...
                             !ctx.dr().receiveEnabled()                     // and no DR.
                         ) {
                             // This method can only be used when there are no replicated entries in the batch.
@@ -1614,7 +1614,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         if (needReload != null) {
             final Map<KeyCacheObject, Integer> idxMap = needReload;
 
-            ctx.store().loadAllFromStore(null, needReload.keySet(), new CI2<KeyCacheObject, Object>() {
+            ctx.store().loadAll(null, needReload.keySet(), new CI2<KeyCacheObject, Object>() {
                 @Override public void apply(KeyCacheObject k, Object v) {
                     Integer idx = idxMap.get(k);
 
@@ -1917,7 +1917,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     putMap;
 
                 try {
-                    ctx.store().putAllToStore(null, F.viewReadOnly(storeMap, new C1<Object, IgniteBiTuple<Object, GridCacheVersion>>() {
+                    ctx.store().putAll(null, F.viewReadOnly(storeMap, new C1<Object, IgniteBiTuple<Object, GridCacheVersion>>() {
                         @Override public IgniteBiTuple<Object, GridCacheVersion> apply(Object v) {
                             return F.t(v, ver);
                         }
@@ -1940,7 +1940,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     rmvKeys;
 
                 try {
-                    ctx.store().removeAllFromStore(null, storeKeys);
+                    ctx.store().removeAll(null, storeKeys);
                 }
                 catch (CacheStorePartialUpdateException e) {
                     storeErr = e;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index a9a6f23..98d9283 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -1306,7 +1306,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         try {
             if (putMap != null) {
                 try {
-                    ctx.store().putAllToStore(null, F.viewReadOnly(putMap, new C1<Object, IgniteBiTuple<Object, GridCacheVersion>>() {
+                    ctx.store().putAll(null, F.viewReadOnly(putMap, new C1<Object, IgniteBiTuple<Object, GridCacheVersion>>() {
                         @Override public IgniteBiTuple<Object, GridCacheVersion> apply(Object v) {
                             return F.t(v, ver);
                         }
@@ -1320,7 +1320,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
             }
             else {
                 try {
-                    ctx.store().removeAllFromStore(null, rmvKeys);
+                    ctx.store().removeAll(null, rmvKeys);
                 }
                 catch (CacheStorePartialUpdateException e) {
                     storeErr = e;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java
new file mode 100644
index 0000000..5fde622
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+
+/**
+ * Default store manager implementation.
+ */
+public class CacheOsStoreManager extends GridCacheStoreManagerAdapter {
+    /** Ignite context. */
+    private final GridKernalContext ctx;
+
+    /** Cache configuration. */
+    private final CacheConfiguration cfg;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Ignite context.
+     * @param cfg Cache configuration.
+     */
+    public CacheOsStoreManager(GridKernalContext ctx, CacheConfiguration cfg) {
+        this.ctx = ctx;
+        this.cfg = cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridKernalContext igniteContext() {
+        return ctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration() {
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean convertPortable() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
new file mode 100644
index 0000000..d9f50ac
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Cache store manager interface.
+ */
+public interface CacheStoreManager<K, V> extends GridCacheManager<K, V> {
+    /**
+     * Initialize store manager.
+     *
+     * @param cfgStore   Actual store.
+     * @param sesHolders Session holders.
+     * @throws org.apache.ignite.IgniteCheckedException If failed.
+     */
+    public void initialize(@Nullable CacheStore<?, ?> cfgStore, Map<CacheStore, ThreadLocal> sesHolders)
+        throws IgniteCheckedException;
+
+    /**
+     * @return {@code true} If store configured.
+     */
+    public boolean configured();
+
+    /**
+     * @return Wrapped store.
+     */
+    public CacheStore<Object, Object> store();
+
+    /**
+     * @return Unwrapped store provided in configuration.
+     */
+    public CacheStore<?, ?> configuredStore();
+
+    /**
+     * @return {@code true} If local store is configured.
+     */
+    public boolean isLocal();
+
+    /**
+     * @return {@code True} is write-through is enabled.
+     */
+    public boolean isWriteThrough();
+
+    /**
+     * @return Whether DHT transaction can write to store from DHT.
+     */
+    public boolean isWriteToStoreFromDht();
+
+    /**
+     * Loads data from persistent store.
+     *
+     * @param tx Cache transaction.
+     * @param key Cache key.
+     * @return Loaded value, possibly <tt>null</tt>.
+     * @throws IgniteCheckedException If data loading failed.
+     */
+    @Nullable public Object load(@Nullable IgniteInternalTx tx, KeyCacheObject key) throws IgniteCheckedException;
+
+    /**
+     * Loads data from persistent store.
+     *
+     * @param tx Cache transaction.
+     * @param keys Cache keys.
+     * @param vis Closure.
+     * @return {@code True} if there is a persistent storage.
+     * @throws IgniteCheckedException If data loading failed.
+     */
+    public boolean loadAll(@Nullable IgniteInternalTx tx, Collection<? extends KeyCacheObject> keys,
+        IgniteBiInClosure<KeyCacheObject, Object> vis) throws IgniteCheckedException;
+
+    /**
+     * @param tx Cache transaction.
+     * @param keys Cache keys.
+     * @param vis Closure to apply for loaded elements.
+     * @throws IgniteCheckedException If data loading failed.
+     */
+    public void localStoreLoadAll(@Nullable IgniteInternalTx tx, Collection<? extends KeyCacheObject> keys,
+        final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> vis) throws IgniteCheckedException;
+
+    /**
+     * Loads data from persistent store.
+     *
+     * @param vis Closer to cache loaded elements.
+     * @param args User arguments.
+     * @return {@code True} if there is a persistent storage.
+     * @throws IgniteCheckedException If data loading failed.
+     */
+    public boolean loadCache(final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> vis, Object[] args)
+        throws IgniteCheckedException;
+
+    /**
+     * Puts key-value pair into storage.
+     *
+     * @param tx Cache transaction.
+     * @param key Key.
+     * @param val Value.
+     * @param ver Version.
+     * @return {@code true} If there is a persistent storage.
+     * @throws IgniteCheckedException If storage failed.
+     */
+    public boolean put(@Nullable IgniteInternalTx tx, Object key, Object val, GridCacheVersion ver)
+        throws IgniteCheckedException;
+
+    /**
+     * Puts key-value pair into storage.
+     *
+     * @param tx Cache transaction.
+     * @param map Map.
+     * @return {@code True} if there is a persistent storage.
+     * @throws IgniteCheckedException If storage failed.
+     */
+    public boolean putAll(@Nullable IgniteInternalTx tx, Map<Object, IgniteBiTuple<Object, GridCacheVersion>> map)
+        throws IgniteCheckedException;
+
+    /**
+     * @param tx Cache transaction.
+     * @param key Key.
+     * @return {@code True} if there is a persistent storage.
+     * @throws IgniteCheckedException If storage failed.
+     */
+    public boolean remove(@Nullable IgniteInternalTx tx, Object key) throws IgniteCheckedException;
+
+    /**
+     * @param tx Cache transaction.
+     * @param keys Key.
+     * @return {@code True} if there is a persistent storage.
+     * @throws IgniteCheckedException If storage failed.
+     */
+    public boolean removeAll(@Nullable IgniteInternalTx tx, Collection<Object> keys)
+        throws IgniteCheckedException;
+
+    /**
+     * @param tx Transaction.
+     * @param commit Commit.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void sessionEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException;
+
+    /**
+     * End session initiated by write-behind store.
+     */
+    public void writeBehindSessionInit();
+
+    /**
+     * End session initiated by write-behind store.
+     *
+     * @param threwEx If exception was thrown.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void writeBehindSessionEnd(boolean threwEx) throws IgniteCheckedException;
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void forceFlush() throws IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
new file mode 100644
index 0000000..f9a966c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -0,0 +1,1111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.lifecycle.*;
+import org.apache.ignite.transactions.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import javax.cache.integration.*;
+import java.util.*;
+
+/**
+ * Store manager.
+ */
+@SuppressWarnings({"AssignmentToCatchBlockParameter", "unchecked"})
+public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapter implements CacheStoreManager {
+    /** */
+    private static final UUID SES_ATTR = UUID.randomUUID();
+
+    /** */
+    protected CacheStore<Object, Object> store;
+
+    /** */
+    protected CacheStore<?, ?> cfgStore;
+
+    /** */
+    private CacheStoreBalancingWrapper<Object, Object> singleThreadGate;
+
+    /** */
+    private ThreadLocal<SessionData> sesHolder;
+
+    /** */
+    private boolean locStore;
+
+    /** */
+    private boolean writeThrough;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void initialize(@Nullable CacheStore cfgStore, Map sesHolders) throws IgniteCheckedException {
+        GridKernalContext ctx = igniteContext();
+        CacheConfiguration cfg = cacheConfiguration();
+
+        writeThrough = cfg.isWriteThrough();
+
+        this.cfgStore = cfgStore;
+
+        store = cacheStoreWrapper(ctx, cfgStore, cfg);
+
+        singleThreadGate = store == null ? null : new CacheStoreBalancingWrapper<>(store);
+
+        ThreadLocal<SessionData> sesHolder0 = null;
+
+        if (cfgStore != null) {
+            sesHolder0 = ((Map<CacheStore, ThreadLocal>)sesHolders).get(cfgStore);
+
+            if (sesHolder0 == null) {
+                ThreadLocalSession locSes = new ThreadLocalSession();
+
+                if (ctx.resource().injectStoreSession(cfgStore, locSes)) {
+                    sesHolder0 = locSes.sesHolder;
+
+                    sesHolders.put(cfgStore, sesHolder0);
+                }
+            }
+        }
+
+        sesHolder = sesHolder0;
+
+        locStore = U.hasAnnotation(cfgStore, CacheLocalStore.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isWriteThrough() {
+        return writeThrough;
+    }
+
+    /**
+     * Creates a wrapped cache store if write-behind cache is configured.
+     *
+     * @param ctx Kernal context.
+     * @param cfgStore Store provided in configuration.
+     * @param cfg Cache configuration.
+     * @return Instance if {@link GridCacheWriteBehindStore} if write-behind store is configured,
+     *         or user-defined cache store.
+     */
+    @SuppressWarnings({"unchecked"})
+    private CacheStore cacheStoreWrapper(GridKernalContext ctx,
+        @Nullable CacheStore cfgStore,
+        CacheConfiguration cfg) {
+        if (cfgStore == null || !cfg.isWriteBehindEnabled())
+            return cfgStore;
+
+        GridCacheWriteBehindStore store = new GridCacheWriteBehindStore(this,
+            ctx.gridName(),
+            cfg.getName(),
+            ctx.log(GridCacheWriteBehindStore.class),
+            cfgStore);
+
+        store.setFlushSize(cfg.getWriteBehindFlushSize());
+        store.setFlushThreadCount(cfg.getWriteBehindFlushThreadCount());
+        store.setFlushFrequency(cfg.getWriteBehindFlushFrequency());
+        store.setBatchSize(cfg.getWriteBehindBatchSize());
+
+        return store;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void start0() throws IgniteCheckedException {
+        if (store instanceof LifecycleAware) {
+            try {
+                // Avoid second start() call on store in case when near cache is enabled.
+                if (cctx.config().isWriteBehindEnabled()) {
+                    if (!cctx.isNear())
+                        ((LifecycleAware)store).start();
+                }
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException("Failed to start cache store: " + e, e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void stop0(boolean cancel) {
+        if (store instanceof LifecycleAware) {
+            try {
+                // Avoid second start() call on store in case when near cache is enabled.
+                if (cctx.config().isWriteBehindEnabled()) {
+                    if (!cctx.isNear())
+                        ((LifecycleAware)store).stop();
+                }
+            }
+            catch (Exception e) {
+                U.error(log(), "Failed to stop cache store.", e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isLocal() {
+        return locStore;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean configured() {
+        return store != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheStore<?, ?> configuredStore() {
+        return cfgStore;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public Object load(@Nullable IgniteInternalTx tx, KeyCacheObject key)
+        throws IgniteCheckedException {
+        return loadFromStore(tx, key, true);
+    }
+
+    /**
+     * Loads data from persistent store.
+     *
+     * @param tx Cache transaction.
+     * @param key Cache key.
+     * @param convert Convert flag.
+     * @return Loaded value, possibly <tt>null</tt>.
+     * @throws IgniteCheckedException If data loading failed.
+     */
+    @Nullable private Object loadFromStore(@Nullable IgniteInternalTx tx,
+        KeyCacheObject key,
+        boolean convert)
+        throws IgniteCheckedException {
+        if (store != null) {
+            if (key.internal())
+                // Never load internal keys from store as they are never persisted.
+                return null;
+
+            Object storeKey = key.value(cctx.cacheObjectContext(), false);
+
+            if (convertPortable())
+                storeKey = cctx.unwrapPortableIfNeeded(storeKey, false);
+
+            if (log.isDebugEnabled())
+                log.debug("Loading value from store for key: " + storeKey);
+
+            sessionInit0(tx);
+
+            boolean thewEx = true;
+
+            Object val = null;
+
+            try {
+                val = singleThreadGate.load(storeKey);
+
+                thewEx = false;
+            }
+            catch (ClassCastException e) {
+                handleClassCastException(e);
+            }
+            catch (CacheLoaderException e) {
+                throw new IgniteCheckedException(e);
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException(new CacheLoaderException(e));
+            }
+            finally {
+                sessionEnd0(tx, thewEx);
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Loaded value from store [key=" + key + ", val=" + val + ']');
+
+            if (convert) {
+                val = convert(val);
+
+                return val;
+            }
+            else
+                return val;
+        }
+
+        return null;
+    }
+
+    /**
+     * @param val Internal value.
+     * @return User value.
+     */
+    private Object convert(Object val) {
+        if (val == null)
+            return null;
+
+        return locStore ? ((IgniteBiTuple<Object, GridCacheVersion>)val).get1() : val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isWriteToStoreFromDht() {
+        return cctx.config().isWriteBehindEnabled() || locStore;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void localStoreLoadAll(@Nullable IgniteInternalTx tx, Collection keys, GridInClosure3 vis)
+        throws IgniteCheckedException {
+        assert store != null;
+        assert locStore;
+
+        loadAllFromStore(tx, keys, null, vis);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean loadAll(@Nullable IgniteInternalTx tx, Collection keys, IgniteBiInClosure vis)
+        throws IgniteCheckedException {
+        if (store != null) {
+            loadAllFromStore(tx, keys, vis, null);
+
+            return true;
+        }
+        else {
+            for (Object key : keys)
+                vis.apply(key, null);
+        }
+
+        return false;
+    }
+
+    /**
+     * @param tx Cache transaction.
+     * @param keys Keys to load.
+     * @param vis Key/value closure (only one of vis or verVis can be specified).
+     * @param verVis Key/value/version closure (only one of vis or verVis can be specified).
+     * @throws IgniteCheckedException If failed.
+     */
+    private void loadAllFromStore(@Nullable IgniteInternalTx tx,
+        Collection<? extends KeyCacheObject> keys,
+        @Nullable final IgniteBiInClosure<KeyCacheObject, Object> vis,
+        @Nullable final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> verVis)
+        throws IgniteCheckedException {
+        assert vis != null ^ verVis != null;
+        assert verVis == null || locStore;
+
+        final boolean convert = verVis == null;
+
+        if (!keys.isEmpty()) {
+            if (keys.size() == 1) {
+                KeyCacheObject key = F.first(keys);
+
+                if (convert)
+                    vis.apply(key, load(tx, key));
+                else {
+                    IgniteBiTuple<Object, GridCacheVersion> t =
+                        (IgniteBiTuple<Object, GridCacheVersion>)loadFromStore(tx, key, false);
+
+                    if (t != null)
+                        verVis.apply(key, t.get1(), t.get2());
+                }
+
+                return;
+            }
+
+            Collection<Object> keys0;
+
+            if (convertPortable()) {
+                keys0 = F.viewReadOnly(keys, new C1<KeyCacheObject, Object>() {
+                    @Override public Object apply(KeyCacheObject key) {
+                        return cctx.unwrapPortableIfNeeded(key.value(cctx.cacheObjectContext(), false), false);
+                    }
+                });
+            }
+            else {
+                keys0 = F.viewReadOnly(keys, new C1<KeyCacheObject, Object>() {
+                    @Override public Object apply(KeyCacheObject key) {
+                        return key.value(cctx.cacheObjectContext(), false);
+                    }
+                });
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Loading values from store for keys: " + keys0);
+
+            sessionInit0(tx);
+
+            boolean thewEx = true;
+
+            try {
+                IgniteBiInClosure<Object, Object> c = new CI2<Object, Object>() {
+                    @SuppressWarnings("ConstantConditions")
+                    @Override public void apply(Object k, Object val) {
+                        if (convert) {
+                            Object v = convert(val);
+
+                            vis.apply(cctx.toCacheKeyObject(k), v);
+                        }
+                        else {
+                            IgniteBiTuple<Object, GridCacheVersion> v = (IgniteBiTuple<Object, GridCacheVersion>)val;
+
+                            if (v != null)
+                                verVis.apply(cctx.toCacheKeyObject(k), v.get1(), v.get2());
+                        }
+                    }
+                };
+
+                if (keys.size() > singleThreadGate.loadAllThreshold()) {
+                    Map<Object, Object> map = store.loadAll(keys0);
+
+                    if (map != null) {
+                        for (Map.Entry<Object, Object> e : map.entrySet())
+                            c.apply(cctx.toCacheKeyObject(e.getKey()), e.getValue());
+                    }
+                }
+                else
+                    singleThreadGate.loadAll(keys0, c);
+
+                thewEx = false;
+            }
+            catch (ClassCastException e) {
+                handleClassCastException(e);
+            }
+            catch (CacheLoaderException e) {
+                throw new IgniteCheckedException(e);
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException(new CacheLoaderException(e));
+            }
+            finally {
+                sessionEnd0(tx, thewEx);
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Loaded values from store for keys: " + keys0);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean loadCache(final GridInClosure3 vis, Object[] args) throws IgniteCheckedException {
+        if (store != null) {
+            if (log.isDebugEnabled())
+                log.debug("Loading all values from store.");
+
+            sessionInit0(null);
+
+            boolean thewEx = true;
+
+            try {
+                store.loadCache(new IgniteBiInClosure<Object, Object>() {
+                    @Override public void apply(Object k, Object o) {
+                        Object v;
+                        GridCacheVersion ver = null;
+
+                        if (locStore) {
+                            IgniteBiTuple<Object, GridCacheVersion> t = (IgniteBiTuple<Object, GridCacheVersion>)o;
+
+                            v = t.get1();
+                            ver = t.get2();
+                        }
+                        else
+                            v = o;
+
+                        KeyCacheObject cacheKey = cctx.toCacheKeyObject(k);
+
+                        vis.apply(cacheKey, v, ver);
+                    }
+                }, args);
+
+                thewEx = false;
+            }
+            catch (CacheLoaderException e) {
+                throw new IgniteCheckedException(e);
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException(new CacheLoaderException(e));
+            }
+            finally {
+                sessionEnd0(null, thewEx);
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Loaded all values from store.");
+
+            return true;
+        }
+
+        LT.warn(log, null, "Calling Cache.loadCache() method will have no effect, " +
+            "CacheConfiguration.getStore() is not defined for cache: " + cctx.namexx());
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean put(@Nullable IgniteInternalTx tx, Object key, Object val, GridCacheVersion ver)
+        throws IgniteCheckedException {
+        if (store != null) {
+            // Never persist internal keys.
+            if (key instanceof GridCacheInternal)
+                return true;
+
+            if (convertPortable()) {
+                key = cctx.unwrapPortableIfNeeded(key, false);
+                val = cctx.unwrapPortableIfNeeded(val, false);
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Storing value in cache store [key=" + key + ", val=" + val + ']');
+
+            sessionInit0(tx);
+
+            boolean thewEx = true;
+
+            try {
+                store.write(new CacheEntryImpl<>(key, locStore ? F.t(val, ver) : val));
+
+                thewEx = false;
+            }
+            catch (ClassCastException e) {
+                handleClassCastException(e);
+            }
+            catch (CacheWriterException e) {
+                throw new IgniteCheckedException(e);
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException(new CacheWriterException(e));
+            }
+            finally {
+                sessionEnd0(tx, thewEx);
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Stored value in cache store [key=" + key + ", val=" + val + ']');
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean putAll(@Nullable IgniteInternalTx tx, Map map) throws IgniteCheckedException {
+        if (F.isEmpty(map))
+            return true;
+
+        if (map.size() == 1) {
+            Map.Entry<Object, IgniteBiTuple<Object, GridCacheVersion>> e =
+                ((Map<Object, IgniteBiTuple<Object, GridCacheVersion>>)map).entrySet().iterator().next();
+
+            return put(tx, e.getKey(), e.getValue().get1(), e.getValue().get2());
+        }
+        else {
+            if (store != null) {
+                EntriesView entries = new EntriesView(map);
+
+                if (log.isDebugEnabled())
+                    log.debug("Storing values in cache store [entries=" + entries + ']');
+
+                sessionInit0(tx);
+
+                boolean thewEx = true;
+
+                try {
+                    store.writeAll(entries);
+
+                    thewEx = false;
+                }
+                catch (ClassCastException e) {
+                    handleClassCastException(e);
+                }
+                catch (Exception e) {
+                    if (!(e instanceof CacheWriterException))
+                        e = new CacheWriterException(e);
+
+                    if (!entries.isEmpty()) {
+                        List<Object> keys = new ArrayList<>(entries.size());
+
+                        for (Cache.Entry<?, ?> entry : entries)
+                            keys.add(entry.getKey());
+
+                        throw new CacheStorePartialUpdateException(keys, e);
+                    }
+
+                    throw new IgniteCheckedException(e);
+                }
+                finally {
+                    sessionEnd0(tx, thewEx);
+                }
+
+                if (log.isDebugEnabled())
+                    log.debug("Stored value in cache store [entries=" + entries + ']');
+
+                return true;
+            }
+
+            return false;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean remove(@Nullable IgniteInternalTx tx, Object key) throws IgniteCheckedException {
+        if (store != null) {
+            // Never remove internal key from store as it is never persisted.
+            if (key instanceof GridCacheInternal)
+                return false;
+
+            if (convertPortable())
+                key = cctx.unwrapPortableIfNeeded(key, false);
+
+            if (log.isDebugEnabled())
+                log.debug("Removing value from cache store [key=" + key + ']');
+
+            sessionInit0(tx);
+
+            boolean thewEx = true;
+
+            try {
+                store.delete(key);
+
+                thewEx = false;
+            }
+            catch (ClassCastException e) {
+                handleClassCastException(e);
+            }
+            catch (CacheWriterException e) {
+                throw new IgniteCheckedException(e);
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException(new CacheWriterException(e));
+            }
+            finally {
+                sessionEnd0(tx, thewEx);
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Removed value from cache store [key=" + key + ']');
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean removeAll(@Nullable IgniteInternalTx tx, Collection keys)
+        throws IgniteCheckedException {
+        if (F.isEmpty(keys))
+            return true;
+
+        if (keys.size() == 1) {
+            Object key = keys.iterator().next();
+
+            return remove(tx, key);
+        }
+
+        if (store != null) {
+            Collection<Object> keys0 = convertPortable() ? cctx.unwrapPortablesIfNeeded(keys, false) : keys;
+
+            if (log.isDebugEnabled())
+                log.debug("Removing values from cache store [keys=" + keys0 + ']');
+
+            sessionInit0(tx);
+
+            boolean thewEx = true;
+
+            try {
+                store.deleteAll(keys0);
+
+                thewEx = false;
+            }
+            catch (ClassCastException e) {
+                handleClassCastException(e);
+            }
+            catch (Exception e) {
+                if (!(e instanceof CacheWriterException))
+                    e = new CacheWriterException(e);
+
+                if (!keys0.isEmpty())
+                    throw new CacheStorePartialUpdateException(keys0, e);
+
+                throw new IgniteCheckedException(e);
+            }
+            finally {
+                sessionEnd0(tx, thewEx);
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Removed values from cache store [keys=" + keys0 + ']');
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheStore<Object, Object> store() {
+        return store;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void forceFlush() throws IgniteCheckedException {
+        if (store instanceof GridCacheWriteBehindStore)
+            ((GridCacheWriteBehindStore)store).forceFlush();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void sessionEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException {
+        assert store != null;
+
+        sessionInit0(tx);
+
+        try {
+            store.sessionEnd(commit);
+        }
+        finally {
+            if (sesHolder != null) {
+                sesHolder.set(null);
+
+                tx.removeMeta(SES_ATTR);
+            }
+        }
+    }
+
+    /**
+     * @param e Class cast exception.
+     * @throws IgniteCheckedException Thrown exception.
+     */
+    private void handleClassCastException(ClassCastException e) throws IgniteCheckedException {
+        assert e != null;
+
+        if (e.getMessage() != null) {
+            throw new IgniteCheckedException("Cache store must work with portable objects if portables are " +
+                "enabled for cache [cacheName=" + cctx.namex() + ']', e);
+        }
+        else
+            throw e;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBehindSessionInit() {
+        sessionInit0(null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBehindSessionEnd(boolean threwEx) throws IgniteCheckedException {
+        sessionEnd0(null, threwEx);
+    }
+
+    /**
+     * @param tx Current transaction.
+     */
+    private void sessionInit0(@Nullable IgniteInternalTx tx) {
+        if (sesHolder == null)
+            return;
+
+        assert sesHolder.get() == null;
+
+        SessionData ses;
+
+        if (tx != null) {
+            ses = tx.meta(SES_ATTR);
+
+            if (ses == null) {
+                ses = new SessionData(tx, cctx.name());
+
+                tx.addMeta(SES_ATTR, ses);
+            }
+            else
+                // Session cache name may change in cross-cache transaction.
+                ses.cacheName(cctx.name());
+        }
+        else
+            ses = new SessionData(null, cctx.name());
+
+        sesHolder.set(ses);
+    }
+
+    /**
+     * Clears session holder.
+     */
+    private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) throws IgniteCheckedException {
+        try {
+            if (tx == null)
+                store.sessionEnd(threwEx);
+        }
+        catch (Exception e) {
+            if (!threwEx)
+                throw U.cast(e);
+        }
+        finally {
+            if (sesHolder != null)
+                sesHolder.set(null);
+        }
+    }
+
+    /**
+     * @return Ignite context.
+     */
+    protected abstract GridKernalContext igniteContext();
+
+    /**
+     * @return Cache configuration.
+     */
+    protected abstract CacheConfiguration cacheConfiguration();
+
+    /**
+     * @return Convert-portable flag.
+     */
+    protected abstract boolean convertPortable();
+
+    /**
+     *
+     */
+    private static class SessionData {
+        /** */
+        @GridToStringExclude
+        private final IgniteInternalTx tx;
+
+        /** */
+        private String cacheName;
+
+        /** */
+        @GridToStringInclude
+        private Map<Object, Object> props;
+
+        /**
+         * @param tx Current transaction.
+         * @param cacheName Cache name.
+         */
+        private SessionData(@Nullable IgniteInternalTx tx, @Nullable String cacheName) {
+            this.tx = tx;
+            this.cacheName = cacheName;
+        }
+
+        /**
+         * @return Transaction.
+         */
+        @Nullable private Transaction transaction() {
+            return tx != null ? tx.proxy() : null;
+        }
+
+        /**
+         * @return Properties.
+         */
+        private Map<Object, Object> properties() {
+            if (props == null)
+                props = new GridLeanMap<>();
+
+            return props;
+        }
+
+        /**
+         * @return Cache name.
+         */
+        private String cacheName() {
+            return cacheName;
+        }
+
+        /**
+         * @param cacheName Cache name.
+         */
+        private void cacheName(String cacheName) {
+            this.cacheName = cacheName;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SessionData.class, this, "tx", CU.txString(tx));
+        }
+    }
+
+    /**
+     *
+     */
+    private static class ThreadLocalSession implements CacheStoreSession {
+        /** */
+        private final ThreadLocal<SessionData> sesHolder = new ThreadLocal<>();
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Transaction transaction() {
+            SessionData ses0 = sesHolder.get();
+
+            return ses0 != null ? ses0.transaction() : null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isWithinTransaction() {
+            return transaction() != null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public <K1, V1> Map<K1, V1> properties() {
+            SessionData ses0 = sesHolder.get();
+
+            return ses0 != null ? (Map<K1, V1>)ses0.properties() : null;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public String cacheName() {
+            SessionData ses0 = sesHolder.get();
+
+            return ses0 != null ? ses0.cacheName() : null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ThreadLocalSession.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private class EntriesView extends AbstractCollection<Cache.Entry<?, ?>> {
+        /** */
+        private final Map<?, IgniteBiTuple<?, GridCacheVersion>> map;
+
+        /** */
+        private Set<Object> rmvd;
+
+        /** */
+        private boolean cleared;
+
+        /**
+         * @param map Map.
+         */
+        private EntriesView(Map<?, IgniteBiTuple<?, GridCacheVersion>> map) {
+            assert map != null;
+
+            this.map = map;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int size() {
+            return cleared ? 0 : (map.size() - (rmvd != null ? rmvd.size() : 0));
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isEmpty() {
+            return cleared || !iterator().hasNext();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean contains(Object o) {
+            if (cleared || !(o instanceof Cache.Entry))
+                return false;
+
+            Cache.Entry<?, ?> e = (Cache.Entry<?, ?>)o;
+
+            return map.containsKey(e.getKey());
+        }
+
+        /** {@inheritDoc} */
+        @NotNull @Override public Iterator<Cache.Entry<?, ?>> iterator() {
+            if (cleared)
+                return F.emptyIterator();
+
+            final Iterator<Map.Entry<?, IgniteBiTuple<?, GridCacheVersion>>> it0 = (Iterator)map.entrySet().iterator();
+
+            return new Iterator<Cache.Entry<?, ?>>() {
+                /** */
+                private Cache.Entry<?, ?> cur;
+
+                /** */
+                private Cache.Entry<?, ?> next;
+
+                /**
+                 *
+                 */
+                {
+                    checkNext();
+                }
+
+                /**
+                 *
+                 */
+                private void checkNext() {
+                    while (it0.hasNext()) {
+                        Map.Entry<?, IgniteBiTuple<?, GridCacheVersion>> e = it0.next();
+
+                        Object k = e.getKey();
+
+                        if (rmvd != null && rmvd.contains(k))
+                            continue;
+
+                        Object v = locStore ? e.getValue() : e.getValue().get1();
+
+                        if (convertPortable()) {
+                            k = cctx.unwrapPortableIfNeeded(k, false);
+                            v = cctx.unwrapPortableIfNeeded(v, false);
+                        }
+
+                        next = new CacheEntryImpl<>(k, v);
+
+                        break;
+                    }
+                }
+
+                @Override public boolean hasNext() {
+                    return next != null;
+                }
+
+                @Override public Cache.Entry<?, ?> next() {
+                    if (next == null)
+                        throw new NoSuchElementException();
+
+                    cur = next;
+
+                    next = null;
+
+                    checkNext();
+
+                    return cur;
+                }
+
+                @Override public void remove() {
+                    if (cur == null)
+                        throw new IllegalStateException();
+
+                    addRemoved(cur);
+
+                    cur = null;
+                }
+            };
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean add(Cache.Entry<?, ?> entry) {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean addAll(Collection<? extends Cache.Entry<?, ?>> col) {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean remove(Object o) {
+            if (cleared || !(o instanceof Cache.Entry))
+                return false;
+
+            Cache.Entry<?, ?> e = (Cache.Entry<?, ?>)o;
+
+            if (rmvd != null && rmvd.contains(e.getKey()))
+                return false;
+
+            if (mapContains(e)) {
+                addRemoved(e);
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean containsAll(Collection<?> col) {
+            if (cleared)
+                return false;
+
+            for (Object o : col) {
+                if (contains(o))
+                    return false;
+            }
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean removeAll(Collection<?> col) {
+            if (cleared)
+                return false;
+
+            boolean modified = false;
+
+            for (Object o : col) {
+                if (remove(o))
+                    modified = true;
+            }
+
+            return modified;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean retainAll(Collection<?> col) {
+            if (cleared)
+                return false;
+
+            boolean modified = false;
+
+            for (Cache.Entry<?, ?> e : this) {
+                if (!col.contains(e)) {
+                    addRemoved(e);
+
+                    modified = true;
+                }
+            }
+
+            return modified;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void clear() {
+            cleared = true;
+        }
+
+        /**
+         * @param e Entry.
+         */
+        private void addRemoved(Cache.Entry<?, ?> e) {
+            if (rmvd == null)
+                rmvd = new HashSet<>();
+
+            rmvd.add(e.getKey());
+        }
+
+        /**
+         * @param e Entry.
+         * @return {@code True} if original map contains entry.
+         */
+        private boolean mapContains(Cache.Entry<?, ?> e) {
+            return map.containsKey(e.getKey());
+        }
+
+        /** {@inheritDoc} */
+        public String toString() {
+            Iterator<Cache.Entry<?, ?>> it = iterator();
+
+            if (!it.hasNext())
+                return "[]";
+
+            SB sb = new SB("[");
+
+            while (true) {
+                Cache.Entry<?, ?> e = it.next();
+
+                sb.a(e.toString());
+
+                if (!it.hasNext())
+                    return sb.a(']').toString();
+
+                sb.a(", ");
+            }
+        }
+    }
+}


Mime
View raw message