ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [15/50] [abbrv] ignite git commit: ignite-1088 Implemented store for multi jvm tests (cherry picked from commit f91b699)
Date Tue, 06 Sep 2016 14:39:37 GMT
ignite-1088 Implemented store for multi jvm tests
(cherry picked from commit f91b699)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5b49dadd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5b49dadd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5b49dadd

Branch: refs/heads/ignite-1.5.31-1
Commit: 5b49dadd3741ee7d4ec92eb97919b15bcbd33482
Parents: f045558
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Jul 1 12:16:56 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Jul 7 12:38:15 2016 +0300

----------------------------------------------------------------------
 .../cache/GridCacheAbstractMetricsSelfTest.java |   2 +-
 .../GridCacheInterceptorAbstractSelfTest.java   |   2 +-
 .../processors/cache/H2CacheStoreStrategy.java  | 468 +++++++++++++++++++
 .../processors/cache/MapCacheStoreStrategy.java | 145 ++++++
 .../cache/TestCacheStoreStrategy.java           |  96 ++++
 ...edOffHeapTieredMultiNodeFullApiSelfTest.java |   2 +-
 .../cache/IgniteCacheQueryIndexSelfTest.java    |   2 +-
 7 files changed, 713 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5b49dadd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java
index 4c04df0..113a3b2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java
@@ -906,7 +906,7 @@ public abstract class GridCacheAbstractMetricsSelfTest extends GridCacheAbstract
         }
 
         // Avoid reloading from store.
-        map.remove(key);
+        storeStgy.removeFromStore(key);
 
         assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() {
             @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b49dadd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAbstractSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAbstractSelfTest.java
index f50a3e0..68bfb6f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAbstractSelfTest.java
@@ -1444,7 +1444,7 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
         interceptor.disabled = true;
 
         if (storeEnabled())
-            assertEquals("Unexpected store value", expVal, map.get(key));
+            assertEquals("Unexpected store value", expVal, storeStgy.getFromStore(key));
 
         try {
             for (int i = 0; i < gridCount(); i++)

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b49dadd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java
new file mode 100644
index 0000000..ccb2994
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.StringReader;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.cache.store.CacheStoreSession;
+import org.apache.ignite.cache.store.CacheStoreSessionListener;
+import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListener;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.h2.jdbcx.JdbcConnectionPool;
+import org.h2.tools.RunScript;
+import org.h2.tools.Server;
+
+/**
+ * {@link TestCacheStoreStrategy} backed by H2 in-memory database.
+ */
+public class H2CacheStoreStrategy implements TestCacheStoreStrategy {
+    /** Pool to get {@link Connection}s from. */
+    private final JdbcConnectionPool dataSrc;
+
+    /** Script that creates CACHE table. */
+    private static final String CREATE_CACHE_TABLE =
+        "create table if not exists CACHE(k binary not null, v binary not null, PRIMARY KEY(k));";
+
+    /** Script that creates STATS table. */
+    private static final String CREATE_STATS_TABLES =
+        "create table if not exists READS(id bigint auto_increment);\n" +
+        "create table if not exists WRITES(id bigint auto_increment);\n" +
+        "create table if not exists REMOVES(id bigint auto_increment);";
+
+    /** Script that populates STATS table */
+    private static final String POPULATE_STATS_TABLE =
+        "delete from READS;\n" +
+        "delete from WRITES;\n" +
+        "delete from REMOVES;";
+
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public H2CacheStoreStrategy() throws IgniteCheckedException {
+        try {
+            Server.createTcpServer().start();
+            dataSrc = H2CacheStoreSessionListenerFactory.createDataSource();
+
+            try (Connection conn = connection()) {
+                RunScript.execute(conn, new StringReader(CREATE_CACHE_TABLE));
+                RunScript.execute(conn, new StringReader(CREATE_STATS_TABLES));
+                RunScript.execute(conn, new StringReader(POPULATE_STATS_TABLE));
+            }
+        }
+        catch (SQLException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getReads() {
+        return queryStats("reads");
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getWrites() {
+        return queryStats("writes");
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getRemoves() {
+        return queryStats("removes");
+    }
+
+    /**
+     * @param tbl Table name.
+     * @return Update statistics.
+     */
+    private int queryStats(String tbl) {
+        return querySingleInt("select count(*) from " + tbl, "Failed to query store stats
[table=" + tbl + "]");
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getStoreSize() {
+        return querySingleInt("select count(*) from CACHE;", "Failed to query number of rows
from CACHE table");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resetStore() {
+        try (Connection conn = connection()) {
+            RunScript.execute(conn, new StringReader("delete from CACHE;"));
+            RunScript.execute(conn, new StringReader(POPULATE_STATS_TABLE));
+        }
+        catch (SQLException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void putToStore(Object key, Object val) {
+        Connection conn = null;
+        try {
+            conn = connection();
+            H2CacheStore.putToDb(conn, key, val);
+        }
+        catch (SQLException e) {
+            throw new IgniteException(e);
+        }
+        finally {
+            U.closeQuiet(conn);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void putAllToStore(Map<?, ?> data) {
+        Connection conn = null;
+        PreparedStatement stmt = null;
+        try {
+            conn = connection();
+            stmt = conn.prepareStatement(H2CacheStore.MERGE);
+            for (Map.Entry<?, ?> e : data.entrySet()) {
+                stmt.setBinaryStream(1, new ByteArrayInputStream(H2CacheStore.serialize(e.getKey())));
+                stmt.setBinaryStream(2, new ByteArrayInputStream(H2CacheStore.serialize(e.getValue())));
+                stmt.addBatch();
+            }
+            stmt.executeBatch();
+        }
+        catch (SQLException e) {
+            throw new IgniteException(e);
+        }
+        finally {
+            U.closeQuiet(stmt);
+            U.closeQuiet(conn);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object getFromStore(Object key) {
+        Connection conn = null;
+        try {
+            conn = connection();
+            return H2CacheStore.getFromDb(conn, key);
+        }
+        catch (SQLException e) {
+            throw new IgniteException(e);
+        }
+        finally {
+            U.closeQuiet(conn);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeFromStore(Object key) {
+        Connection conn = null;
+        try {
+            conn = connection();
+            H2CacheStore.removeFromDb(conn, key);
+        }
+        catch (SQLException e) {
+            throw new IgniteException(e);
+        }
+        finally {
+            U.closeQuiet(conn);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isInStore(Object key) {
+        return getFromStore(key) != null;
+    }
+
+    /**
+     * @return New {@link Connection} from {@link #dataSrc}
+     * @throws SQLException if failed
+     */
+    private Connection connection() throws SQLException {
+        return dataSrc.getConnection();
+    }
+
+    /**
+     * Retrieves single int value from {@link ResultSet} returned by given query.
+     *
+     * @param qry Query string (fully populated, with params).
+     * @param errorMsg Message for {@link IgniteException} to bear in case of failure.
+     * @return Requested value
+     */
+    private int querySingleInt(String qry, String errorMsg) {
+        Connection conn = null;
+        PreparedStatement stmt = null;
+        ResultSet rs = null;
+        try {
+            conn = connection();
+            stmt = conn.prepareStatement(qry);
+            rs = stmt.executeQuery();
+            if (rs.next())
+                return rs.getInt(1);
+            else
+                throw new IgniteException(errorMsg);
+        }
+        catch (SQLException e) {
+            throw new IgniteException(e);
+        }
+        finally {
+            U.closeQuiet(rs);
+            U.closeQuiet(stmt);
+            U.closeQuiet(conn);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void updateCacheConfiguration(CacheConfiguration<Object, Object>
cfg) {
+        cfg.setCacheStoreSessionListenerFactories(new H2CacheStoreSessionListenerFactory());
+    }
+
+    /** {@inheritDoc} */
+    @Override public Factory<? extends CacheStore<Object, Object>> getStoreFactory()
{
+        return new H2StoreFactory();
+    }
+
+    /** Serializable H2 backed cache store factory. */
+    public static class H2StoreFactory implements Factory<CacheStore<Object, Object>>
{
+        /** {@inheritDoc} */
+        @Override public CacheStore<Object, Object> create() {
+            return new H2CacheStore();
+        }
+    }
+
+    /** Serializable {@link Factory} producing H2 backed {@link CacheStoreSessionListener}s.
*/
+    public static class H2CacheStoreSessionListenerFactory implements Factory<CacheStoreSessionListener>
{
+        /**
+         * @return Connection pool
+         */
+        static JdbcConnectionPool createDataSource() {
+            JdbcConnectionPool pool = JdbcConnectionPool.create("jdbc:h2:tcp://localhost/mem:TestDb;LOCK_MODE=0",
"sa", "");
+            pool.setMaxConnections(100);
+            return pool;
+        }
+
+        /** {@inheritDoc} */
+        @Override public CacheStoreSessionListener create() {
+            CacheJdbcStoreSessionListener lsnr = new CacheJdbcStoreSessionListener();
+            lsnr.setDataSource(createDataSource());
+            return lsnr;
+        }
+    }
+
+    /** H2 backed {@link CacheStoreAdapter} implementations */
+    public static class H2CacheStore extends CacheStoreAdapter<Object, Object> {
+        /** Store session */
+        @CacheStoreSessionResource
+        private CacheStoreSession ses;
+
+        /** Template for an insert statement */
+        private static final String MERGE = "merge into CACHE(k, v) values(?, ?);";
+
+        /** {@inheritDoc} */
+        @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object...
args) {
+            Connection conn = ses.attachment();
+            assert conn != null;
+
+            Statement stmt = null;
+            ResultSet rs = null;
+            try {
+                stmt = conn.createStatement();
+                rs = stmt.executeQuery("select * from CACHE");
+                while (rs.next())
+                    clo.apply(deserialize(rs.getBytes(1)), deserialize(rs.getBytes(2)));
+            }
+            catch (SQLException e) {
+                throw new IgniteException(e);
+            }
+            finally {
+                U.closeQuiet(rs);
+                U.closeQuiet(stmt);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object load(Object key) throws CacheLoaderException {
+            try {
+                Connection conn = ses.attachment();
+                Object res = getFromDb(conn, key);
+                updateStats("reads");
+                return res;
+            }
+            catch (SQLException e) {
+                throw new CacheLoaderException("Failed to load object [key=" + key + ']',
e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<?, ?> entry) throws CacheWriterException
{
+            try {
+                Connection conn = ses.attachment();
+                putToDb(conn, entry.getKey(), entry.getValue());
+                updateStats("writes");
+            }
+            catch (SQLException e) {
+                throw new CacheWriterException("Failed to write object [key=" + entry.getKey()
+ ", " +
+                    "val=" + entry.getValue() + ']', e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) throws CacheWriterException {
+            try {
+                Connection conn = ses.attachment();
+                removeFromDb(conn, key);
+                updateStats("removes");
+            }
+            catch (SQLException e) {
+                throw new CacheWriterException("Failed to delete object [key=" + key + ']',
e);
+            }
+        }
+
+        /**
+         * Selects from H2 and deserialize from bytes the value pointed by key.
+         *
+         * @param conn {@link Connection} to use.
+         * @param key Key to look for.
+         * @return Stored object or null if the key is missing from DB.
+         * @throws SQLException If failed.
+         */
+        static Object getFromDb(Connection conn, Object key) throws SQLException {
+            PreparedStatement stmt = null;
+            ResultSet rs = null;
+            try {
+                stmt = conn.prepareStatement("select v from CACHE where k = ?");
+                stmt.setBinaryStream(1, new ByteArrayInputStream(H2CacheStore.serialize(key)));
+                rs = stmt.executeQuery();
+                return rs.next() ? H2CacheStore.deserialize(rs.getBytes(1)) : null;
+            }
+            finally {
+                U.closeQuiet(rs);
+                U.closeQuiet(stmt);
+            }
+        }
+
+        /**
+         * Puts key-value pair to H2.
+         *
+         * @param conn {@link Connection} to use.
+         * @param key Key.
+         * @param val Value.
+         * @throws SQLException If failed.
+         */
+        static void putToDb(Connection conn, Object key, Object val) throws SQLException
{
+            PreparedStatement stmt = null;
+            try {
+                stmt = conn.prepareStatement(H2CacheStore.MERGE);
+                stmt.setBinaryStream(1, new ByteArrayInputStream(H2CacheStore.serialize(key)));
+                stmt.setBinaryStream(2, new ByteArrayInputStream(H2CacheStore.serialize(val)));
+                stmt.executeUpdate();
+            }
+            finally {
+                U.closeQuiet(stmt);
+            }
+        }
+
+        /**
+         * Removes given key and its value from H2.
+         *
+         * @param conn {@link Connection} to invoke query upon.
+         * @param key Key to remove.
+         * @throws SQLException if failed.
+         */
+        static void removeFromDb(Connection conn, Object key) throws SQLException {
+            PreparedStatement stmt = null;
+            try {
+                stmt = conn.prepareStatement("delete from CACHE where k = ?");
+                stmt.setBinaryStream(1, new ByteArrayInputStream(H2CacheStore.serialize(key)));
+                stmt.executeUpdate();
+            }
+            finally {
+                U.closeQuiet(stmt);
+            }
+        }
+
+        /**
+         * Increments stored stats for given operation.
+         *
+         * @param tblName Table name
+         */
+        private void updateStats(String tblName) {
+            Connection conn = ses.attachment();
+            assert conn != null;
+            Statement stmt = null;
+            try {
+                stmt = conn.createStatement();
+                stmt.executeUpdate("insert into " + tblName + " default values");
+            }
+            catch (SQLException e) {
+                throw new IgniteException("Failed to update H2 store usage stats", e);
+            }
+            finally {
+                U.closeQuiet(stmt);
+            }
+        }
+
+        /**
+         * Turns given arbitrary object to byte array.
+         *
+         * @param obj Object to serialize
+         * @return Bytes representation of given object.
+         */
+        static byte[] serialize(Object obj) {
+            try (ByteArrayOutputStream b = new ByteArrayOutputStream()) {
+                try (ObjectOutputStream o = new ObjectOutputStream(b)) {
+                    o.writeObject(obj);
+                }
+                return b.toByteArray();
+            }
+            catch (Exception e) {
+                throw new IgniteException("Failed to serialize object to byte array [obj="
+ obj, e);
+            }
+        }
+
+        /**
+         * Deserializes an object from its byte array representation.
+         *
+         * @param bytes Byte array representation of the object.
+         * @return Deserialized object.
+         */
+        public static Object deserialize(byte[] bytes) {
+            try (ByteArrayInputStream b = new ByteArrayInputStream(bytes)) {
+                try (ObjectInputStream o = new ObjectInputStream(b)) {
+                    return o.readObject();
+                }
+            }
+            catch (Exception e) {
+                throw new IgniteException("Failed to deserialize object from byte array",
e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b49dadd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java
new file mode 100644
index 0000000..800d781
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.jsr166.ConcurrentHashMap8;
+
+/**
+ * {@link TestCacheStoreStrategy} implemented as a wrapper around {@link #map}
+ */
+public class MapCacheStoreStrategy implements TestCacheStoreStrategy {
+    /** Removes counter. */
+    private final static AtomicInteger removes = new AtomicInteger();
+
+    /** Writes counter. */
+    private final static AtomicInteger writes = new AtomicInteger();
+
+    /** Reads counter. */
+    private final static AtomicInteger reads = new AtomicInteger();
+
+    /** Store map. */
+    private final static Map<Object, Object> map = new ConcurrentHashMap8<>();
+
+    /** {@inheritDoc} */
+    @Override public int getReads() {
+        return reads.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getWrites() {
+        return writes.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getRemoves() {
+        return removes.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getStoreSize() {
+        return map.size();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resetStore() {
+        map.clear();
+
+        reads.set(0);
+        writes.set(0);
+        removes.set(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void putToStore(Object key, Object val) {
+        map.put(key, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void putAllToStore(Map<?, ?> data) {
+        map.putAll(data);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object getFromStore(Object key) {
+        return map.get(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeFromStore(Object key) {
+        map.remove(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isInStore(Object key) {
+        return map.containsKey(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void updateCacheConfiguration(CacheConfiguration<Object, Object>
cfg) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public Factory<? extends CacheStore<Object, Object>> getStoreFactory()
{
+        return FactoryBuilder.factoryOf(MapCacheStore.class);
+    }
+
+    /** Serializable {@link #map} backed cache store factory */
+    public static class MapStoreFactory implements Factory<CacheStore<Object, Object>>
{
+        /** {@inheritDoc} */
+        @Override public CacheStore<Object, Object> create() {
+            return new MapCacheStore();
+        }
+    }
+
+    /** {@link CacheStore} backed by {@link #map} */
+    public static class MapCacheStore extends CacheStoreAdapter<Object, Object> {
+        /** {@inheritDoc} */
+        @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object...
args) {
+            for (Map.Entry<Object, Object> e : map.entrySet())
+                clo.apply(e.getKey(), e.getValue());
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object load(Object key) {
+            reads.incrementAndGet();
+            return map.get(key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<?, ?> e) {
+            writes.incrementAndGet();
+            map.put(e.getKey(), e.getValue());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) {
+            removes.incrementAndGet();
+            map.remove(key);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b49dadd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/TestCacheStoreStrategy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/TestCacheStoreStrategy.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/TestCacheStoreStrategy.java
new file mode 100644
index 0000000..9ee174a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/TestCacheStoreStrategy.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Map;
+import javax.cache.configuration.Factory;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.configuration.CacheConfiguration;
+
+/**
+ * Interface for cache store backend manipulation and stats routines.
+ */
+public interface TestCacheStoreStrategy {
+    /**
+     * @return Number of reads to store.
+     */
+    public int getReads();
+
+    /**
+     * @return Number of writes to store.
+     */
+    public int getWrites();
+
+    /**
+     * @return Number of removals from store.
+     */
+    public int getRemoves();
+
+    /**
+     * @return Total number of items in the store.
+     */
+    public int getStoreSize();
+
+    /**
+     * Clear store contents.
+     */
+    public void resetStore();
+
+    /**
+     * Put entry to cache store.
+     *
+     * @param key Key.
+     * @param val Value.
+     */
+    public void putToStore(Object key, Object val);
+
+    /**
+     * @param data Items to put to store.
+     */
+    public void putAllToStore(Map<?, ?> data);
+
+    /**
+     * @param key Key to look for.
+     * @return {@link Object} pointed to by given key or {@code null} if no object is present.
+     */
+    public Object getFromStore(Object key);
+
+    /**
+     * @param key to look for
+     */
+    public void removeFromStore(Object key);
+
+    /**
+     * @param key to look for.
+     * @return {@code True} if object pointed to by key is in store, false otherwise.
+     */
+    public boolean isInStore(Object key);
+
+    /**
+     * Called from {@link GridCacheAbstractSelfTest#cacheConfiguration(String)},
+     * this method allows implementations to tune cache config.
+     *
+     * @param cfg {@link CacheConfiguration} to tune.
+     */
+    public void updateCacheConfiguration(CacheConfiguration<Object, Object> cfg);
+
+    /**
+     * @return {@link Factory} for write-through storage emulator.
+     */
+    public Factory<? extends CacheStore<Object, Object>> getStoreFactory();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b49dadd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOffHeapTieredMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOffHeapTieredMultiNodeFullApiSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOffHeapTieredMultiNodeFullApiSelfTest.java
index cbcc739..0386510 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOffHeapTieredMultiNodeFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOffHeapTieredMultiNodeFullApiSelfTest.java
@@ -68,6 +68,6 @@ public class GridCachePartitionedOffHeapTieredMultiNodeFullApiSelfTest extends
G
         assertEquals(5, primaryCache.localPeek(key, CachePeekMode.ONHEAP).intValue());
         assertNull(primaryCache.localPeek(key, CachePeekMode.OFFHEAP));
         assertEquals(5, cache.get(key).intValue());
-        assertEquals(5, map.get(key));
+        assertEquals(5, storeStgy.getFromStore(key));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b49dadd/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryIndexSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryIndexSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryIndexSelfTest.java
index d834eb3..73b665e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryIndexSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryIndexSelfTest.java
@@ -74,7 +74,7 @@ public class IgniteCacheQueryIndexSelfTest extends GridCacheAbstractSelfTest
{
      */
     public void testWithStoreLoad() throws Exception {
         for (int i = 0; i < ENTRY_CNT; i++)
-            putToStore(i, new CacheValue(i));
+            storeStgy.putToStore(i, new CacheValue(i));
 
         IgniteCache<Integer, CacheValue> cache0 = grid(0).cache(null);
 


Mime
View raw message