ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject incubator-ignite git commit: # IGNITE-32: Tests for store.
Date Thu, 29 Jan 2015 03:56:12 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-32 e716d8278 -> fbf76c785


# IGNITE-32: Tests for store.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fbf76c78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fbf76c78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fbf76c78

Branch: refs/heads/ignite-32
Commit: fbf76c785d5fcb9c0982c9092588a8927afd8d0b
Parents: e716d82
Author: AKuznetsov <akuznetsov@gridgain.com>
Authored: Thu Jan 29 10:56:05 2015 +0700
Committer: AKuznetsov <akuznetsov@gridgain.com>
Committed: Thu Jan 29 10:56:05 2015 +0700

----------------------------------------------------------------------
 .../ignite/cache/store/jdbc/JdbcCacheStore.java |   2 +
 ...AbstractCacheStoreMultithreadedSelfTest.java | 125 ----
 ...ractJdbcCacheStoreMultithreadedSelfTest.java | 201 ++++++
 .../PojoCacheStoreMultitreadedSelfTest.java     | 109 ----
 .../store/jdbc/PojoCacheStoreSelfTest.java      | 627 ------------------
 .../PojoJdbcCacheStoreMultitreadedSelfTest.java |  34 +
 .../store/jdbc/PojoJdbcCacheStoreTest.java      | 630 +++++++++++++++++++
 7 files changed, 867 insertions(+), 861 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fbf76c78/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
index 1f1084d..6e4bda5 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
@@ -716,6 +716,8 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> {
         throws CacheWriterException {
         assert entries != null;
 
+        init();
+
         Connection conn = null;
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fbf76c78/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/AbstractCacheStoreMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/AbstractCacheStoreMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/AbstractCacheStoreMultithreadedSelfTest.java
deleted file mode 100644
index 0eb110c..0000000
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/AbstractCacheStoreMultithreadedSelfTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.jdbc;
-
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.store.jdbc.model.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.jetbrains.annotations.*;
-
-import java.sql.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- *
- */
-public abstract class AbstractCacheStoreMultithreadedSelfTest<T extends JdbcCacheStore> extends GridCommonAbstractTest {
-    /** Default connection URL (value is <tt>jdbc:h2:mem:jdbcCacheStore;DB_CLOSE_DELAY=-1</tt>). */
-    protected static final String DFLT_CONN_URL = "jdbc:h2:mem:autoCacheStore;DB_CLOSE_DELAY=-1";
-
-    /** IP finder. */
-    protected static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** Number of transactions. */
-    private static final int TX_CNT = 1000;
-
-    /** Number of transactions. */
-    private static final int BATCH_CNT = 2000;
-
-    /** Cache store. */
-    protected T store;
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        store = store();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        Class.forName("org.h2.Driver");
-        Connection conn = DriverManager.getConnection(DFLT_CONN_URL, "sa", "");
-
-        Statement stmt = conn.createStatement();
-
-        stmt.executeUpdate("DROP TABLE IF EXISTS Organization");
-        stmt.executeUpdate("DROP TABLE IF EXISTS Person");
-
-        stmt.executeUpdate("CREATE TABLE Organization (id integer PRIMARY KEY, name varchar(50), city varchar(50))");
-        stmt.executeUpdate("CREATE TABLE Person (id integer PRIMARY KEY, org_id integer, name varchar(50))");
-
-        stmt.executeUpdate("CREATE INDEX Org_Name_IDX On Organization (name)");
-        stmt.executeUpdate("CREATE INDEX Org_Name_City_IDX On Organization (name, city)");
-        stmt.executeUpdate("CREATE INDEX Person_Name_IDX1 On Person (name)");
-        stmt.executeUpdate("CREATE INDEX Person_Name_IDX2 On Person (name desc)");
-
-        conn.commit();
-
-        U.closeQuiet(stmt);
-
-        U.closeQuiet(conn);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllGrids();
-    }
-
-    /**
-     * @return New store.
-     * @throws Exception In case of error.
-     */
-    protected abstract T store() throws Exception;
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testMultithreadedPutAll() throws Exception {
-        startGrid();
-
-        multithreaded(new Callable<Object>() {
-            private final Random rnd = new Random();
-
-            @Nullable @Override public Object call() throws Exception {
-                for (int i = 0; i < TX_CNT; i++) {
-                    int cnt = rnd.nextInt(BATCH_CNT);
-
-                    Map<Object, Object> map = U.newHashMap(cnt);
-
-                    for (int j = 0; j < cnt; j++) {
-                        int id = rnd.nextInt();
-
-                        if (rnd.nextBoolean())
-                            map.put(new OrganizationKey(id), new Organization(id, "Name" + id, "City" + id));
-                        else
-                            map.put(new PersonKey(id), new Person(id, rnd.nextInt(), "Name" + id));
-                    }
-
-                    GridCache<Object, Object> cache = cache();
-
-                    cache.putAll(map);
-                }
-
-                return null;
-            }
-        }, 8, "putAll");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fbf76c78/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/AbstractJdbcCacheStoreMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/AbstractJdbcCacheStoreMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/AbstractJdbcCacheStoreMultithreadedSelfTest.java
new file mode 100644
index 0000000..d4ecf42
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/AbstractJdbcCacheStoreMultithreadedSelfTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cache.store.jdbc.model.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+import org.springframework.beans.*;
+import org.springframework.beans.factory.xml.*;
+import org.springframework.context.support.*;
+import org.springframework.core.io.*;
+
+import javax.cache.configuration.*;
+import java.io.*;
+import java.net.*;
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public abstract class AbstractJdbcCacheStoreMultithreadedSelfTest<T extends JdbcCacheStore> extends GridCommonAbstractTest {
+    /** Default connection URL (value is <tt>jdbc:h2:mem:jdbcCacheStore;DB_CLOSE_DELAY=-1</tt>). */
+    protected static final String DFLT_CONN_URL = "jdbc:h2:mem:autoCacheStore;DB_CLOSE_DELAY=-1";
+
+    /** IP finder. */
+    protected static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Number of transactions. */
+    private static final int TX_CNT = 1000;
+
+    /** Number of transactions. */
+    private static final int BATCH_CNT = 2000;
+
+    /** Cache store. */
+    protected T store;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        store = store();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        Class.forName("org.h2.Driver");
+        Connection conn = DriverManager.getConnection(DFLT_CONN_URL, "sa", "");
+
+        Statement stmt = conn.createStatement();
+
+        stmt.executeUpdate("DROP TABLE IF EXISTS Organization");
+        stmt.executeUpdate("DROP TABLE IF EXISTS Person");
+
+        stmt.executeUpdate("CREATE TABLE Organization (id integer PRIMARY KEY, name varchar(50), city varchar(50))");
+        stmt.executeUpdate("CREATE TABLE Person (id integer PRIMARY KEY, org_id integer, name varchar(50))");
+
+        stmt.executeUpdate("CREATE INDEX Org_Name_IDX On Organization (name)");
+        stmt.executeUpdate("CREATE INDEX Org_Name_City_IDX On Organization (name, city)");
+        stmt.executeUpdate("CREATE INDEX Person_Name_IDX1 On Person (name)");
+        stmt.executeUpdate("CREATE INDEX Person_Name_IDX2 On Person (name desc)");
+
+        conn.commit();
+
+        U.closeQuiet(stmt);
+
+        U.closeQuiet(conn);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @return New store.
+     * @throws Exception In case of error.
+     */
+    protected abstract T store() throws Exception;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        c.setDiscoverySpi(disco);
+
+        CacheConfiguration cc = defaultCacheConfiguration();
+
+        cc.setCacheMode(PARTITIONED);
+        cc.setAtomicityMode(ATOMIC);
+        cc.setSwapEnabled(false);
+        cc.setWriteBehindEnabled(false);
+
+        UrlResource metaUrl;
+
+        try {
+            metaUrl = new UrlResource(new File("modules/core/src/test/config/store/jdbc/Ignite.xml").toURI().toURL());
+        }
+        catch (MalformedURLException e) {
+            throw new IgniteCheckedException("Failed to resolve metadata path [err=" + e.getMessage() + ']', e);
+        }
+
+        try {
+            GenericApplicationContext springCtx = new GenericApplicationContext();
+
+            new XmlBeanDefinitionReader(springCtx).loadBeanDefinitions(metaUrl);
+
+            springCtx.refresh();
+
+            Collection<CacheQueryTypeMetadata> tp = springCtx.getBeansOfType(CacheQueryTypeMetadata.class).values();
+
+            CacheQueryConfiguration cq = new CacheQueryConfiguration();
+
+            cq.setTypeMetadata(tp);
+
+            cc.setQueryConfiguration(cq);
+        }
+        catch (BeansException e) {
+            if (X.hasCause(e, ClassNotFoundException.class))
+                throw new IgniteCheckedException("Failed to instantiate Spring XML application context " +
+                    "(make sure all classes used in Spring configuration are present at CLASSPATH) " +
+                    "[springUrl=" + metaUrl + ']', e);
+            else
+                throw new IgniteCheckedException("Failed to instantiate Spring XML application context [springUrl=" +
+                    metaUrl + ", err=" + e.getMessage() + ']', e);
+        }
+
+        cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
+        cc.setReadThrough(true);
+        cc.setWriteThrough(true);
+        cc.setLoadPreviousValue(true);
+
+        c.setCacheConfiguration(cc);
+
+        return c;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultithreadedPutAll() throws Exception {
+        startGrid();
+
+        multithreaded(new Callable<Object>() {
+            private final Random rnd = new Random();
+
+            @Nullable @Override public Object call() throws Exception {
+                for (int i = 0; i < TX_CNT; i++) {
+                    int cnt = rnd.nextInt(BATCH_CNT);
+
+                    Map<Object, Object> map = U.newHashMap(cnt);
+
+                    for (int j = 0; j < cnt; j++) {
+                        int id = rnd.nextInt();
+
+                        if (rnd.nextBoolean())
+                            map.put(new OrganizationKey(id), new Organization(id, "Name" + id, "City" + id));
+                        else
+                            map.put(new PersonKey(id), new Person(id, rnd.nextInt(), "Name" + id));
+                    }
+
+                    GridCache<Object, Object> cache = cache();
+
+                    cache.putAll(map);
+                }
+
+                return null;
+            }
+        }, 8, "putAll");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fbf76c78/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreMultitreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreMultitreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreMultitreadedSelfTest.java
deleted file mode 100644
index d59e3f7..0000000
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreMultitreadedSelfTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.jdbc;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.query.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.h2.jdbcx.*;
-import org.springframework.beans.*;
-import org.springframework.beans.factory.xml.*;
-import org.springframework.context.support.*;
-import org.springframework.core.io.*;
-
-import javax.cache.configuration.*;
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.cache.CacheMode.*;
-
-/**
- *
- */
-public class PojoCacheStoreMultitreadedSelfTest extends AbstractCacheStoreMultithreadedSelfTest<JdbcPojoCacheStore> {
-    /** {@inheritDoc} */
-    @Override protected JdbcPojoCacheStore store() throws Exception {
-        JdbcPojoCacheStore store = new JdbcPojoCacheStore();
-
-        store.setDataSource(JdbcConnectionPool.create(DFLT_CONN_URL, "sa", ""));
-
-        UrlResource metaUrl;
-
-        try {
-            metaUrl = new UrlResource(new File("modules/core/src/test/config/store/jdbc/Ignite.xml").toURI().toURL());
-        }
-        catch (MalformedURLException e) {
-            throw new IgniteCheckedException("Failed to resolve metadata path [err=" + e.getMessage() + ']', e);
-        }
-
-        try {
-            GenericApplicationContext springCtx = new GenericApplicationContext();
-
-            new XmlBeanDefinitionReader(springCtx).loadBeanDefinitions(metaUrl);
-
-            springCtx.refresh();
-
-// TODO IGNITE-32 FIXME
-//            Collection<CacheQueryTypeMetadata> typeMetadata =
-//                springCtx.getBeansOfType(CacheQueryTypeMetadata.class).values();
-        }
-        catch (BeansException e) {
-            if (X.hasCause(e, ClassNotFoundException.class))
-                throw new IgniteCheckedException("Failed to instantiate Spring XML application context " +
-                    "(make sure all classes used in Spring configuration are present at CLASSPATH) " +
-                    "[springUrl=" + metaUrl + ']', e);
-            else
-                throw new IgniteCheckedException("Failed to instantiate Spring XML application context [springUrl=" +
-                    metaUrl + ", err=" + e.getMessage() + ']', e);
-        }
-
-        return store;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration c = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
-        disco.setIpFinder(IP_FINDER);
-
-        c.setDiscoverySpi(disco);
-
-        CacheConfiguration cc = defaultCacheConfiguration();
-
-        cc.setCacheMode(PARTITIONED);
-        cc.setAtomicityMode(ATOMIC);
-        cc.setSwapEnabled(false);
-        cc.setWriteBehindEnabled(false);
-
-        cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
-        cc.setReadThrough(true);
-        cc.setWriteThrough(true);
-        cc.setLoadPreviousValue(true);
-
-        c.setCacheConfiguration(cc);
-
-        return c;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fbf76c78/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreSelfTest.java
deleted file mode 100644
index 534b362..0000000
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreSelfTest.java
+++ /dev/null
@@ -1,627 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.jdbc;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.query.*;
-import org.apache.ignite.cache.store.*;
-import org.apache.ignite.cache.store.jdbc.dialect.*;
-import org.apache.ignite.cache.store.jdbc.model.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.testframework.*;
-import org.apache.ignite.testframework.junits.cache.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.apache.ignite.transactions.*;
-import org.h2.jdbcx.*;
-import org.jetbrains.annotations.*;
-import org.springframework.beans.*;
-import org.springframework.beans.factory.xml.*;
-import org.springframework.context.support.*;
-import org.springframework.core.io.*;
-
-import javax.cache.*;
-import java.io.*;
-import java.net.*;
-import java.sql.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.testframework.junits.cache.GridAbstractCacheStoreSelfTest.*;
-
-/**
- * Class for {@code PojoCacheStore} tests.
- */
-public class PojoCacheStoreSelfTest extends GridCommonAbstractTest {
-    /** Default connection URL (value is <tt>jdbc:h2:mem:jdbcCacheStore;DB_CLOSE_DELAY=-1</tt>). */
-    protected static final String DFLT_CONN_URL = "jdbc:h2:mem:autoCacheStore;DB_CLOSE_DELAY=-1";
-
-    /** Organization count. */
-    protected static final int ORGANIZATION_CNT = 1000;
-
-    /** Person count. */
-    protected static final int PERSON_CNT = 100000;
-
-    /** */
-    protected TestThreadLocalCacheSession ses = new TestThreadLocalCacheSession();
-
-    /** */
-    protected final JdbcPojoCacheStore store;
-
-    /**
-     * @throws Exception If failed.
-     */
-    @SuppressWarnings({"AbstractMethodCallInConstructor", "OverriddenMethodCallDuringObjectConstruction"})
-    public PojoCacheStoreSelfTest() throws Exception {
-        super(false);
-
-        store = store();
-
-        inject(store);
-    }
-
-    /**
-     * @return Store.
-     */
-    protected JdbcPojoCacheStore store() throws IgniteCheckedException {
-        JdbcPojoCacheStore store = new JdbcPojoCacheStore();
-
-        store.setDataSource(JdbcConnectionPool.create(DFLT_CONN_URL, "sa", ""));
-
-        UrlResource metaUrl;
-
-        try {
-            metaUrl = new UrlResource(new File("modules/core/src/test/config/store/jdbc/Ignite.xml").toURI().toURL());
-        }
-        catch (MalformedURLException e) {
-            throw new IgniteCheckedException("Failed to resolve metadata path [err=" + e.getMessage() + ']', e);
-        }
-
-        try {
-            GenericApplicationContext springCtx = new GenericApplicationContext();
-
-            new XmlBeanDefinitionReader(springCtx).loadBeanDefinitions(metaUrl);
-
-            springCtx.refresh();
-
-            Collection<CacheQueryTypeMetadata> typeMetadata =
-                springCtx.getBeansOfType(CacheQueryTypeMetadata.class).values();
-
-            Map<Integer, Map<Object, JdbcCacheStore.EntryMapping>> cacheMappings = new ConcurrentHashMap<>();
-
-            JdbcDialect dialect = store.resolveDialect();
-
-            GridTestUtils.setFieldValue(store, JdbcCacheStore.class, "dialect", dialect);
-
-            Map<Object, JdbcCacheStore.EntryMapping> entryMappings = U.newHashMap(typeMetadata.size());
-
-            for (CacheQueryTypeMetadata type : typeMetadata)
-                entryMappings.put(store.keyId(type.getKeyType()), new JdbcCacheStore.EntryMapping(dialect, type));
-
-            store.buildTypeCache(typeMetadata);
-
-            cacheMappings.put(0, Collections.unmodifiableMap(entryMappings));
-
-            GridTestUtils.setFieldValue(store, JdbcCacheStore.class, "cacheMappings", cacheMappings);
-        }
-        catch (BeansException e) {
-            if (X.hasCause(e, ClassNotFoundException.class))
-                throw new IgniteCheckedException("Failed to instantiate Spring XML application context " +
-                    "(make sure all classes used in Spring configuration are present at CLASSPATH) " +
-                    "[springUrl=" + metaUrl + ']', e);
-            else
-                throw new IgniteCheckedException("Failed to instantiate Spring XML application context [springUrl=" +
-                    metaUrl + ", err=" + e.getMessage() + ']', e);
-        }
-
-        return store;
-    }
-
-    /**
-     * @param store Store.
-     * @throws Exception If failed.
-     */
-    protected void inject(JdbcCacheStore store) throws Exception {
-        getTestResources().inject(store);
-
-        GridTestUtils.setFieldValue(store, CacheStore.class, "ses", ses);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testLoadCache() throws Exception {
-        Connection conn = DriverManager.getConnection(DFLT_CONN_URL, "sa", "");
-
-        Statement stmt = conn.createStatement();
-
-        PreparedStatement orgStmt = conn.prepareStatement("INSERT INTO Organization(id, name, city) VALUES (?, ?, ?)");
-
-        for (int i = 0; i < ORGANIZATION_CNT; i++) {
-            orgStmt.setInt(1, i);
-            orgStmt.setString(2, "name" + i);
-            orgStmt.setString(3, "city" + i % 10);
-
-            orgStmt.addBatch();
-        }
-
-        orgStmt.executeBatch();
-
-        conn.commit();
-
-        PreparedStatement prnStmt = conn.prepareStatement("INSERT INTO Person(id, org_id, name) VALUES (?, ?, ?)");
-
-        for (int i = 0; i < PERSON_CNT; i++) {
-            prnStmt.setInt(1, i);
-            prnStmt.setInt(2, i % 100);
-            prnStmt.setString(3, "name" + i);
-
-            prnStmt.addBatch();
-        }
-
-        prnStmt.executeBatch();
-
-        conn.commit();
-
-        U.closeQuiet(stmt);
-
-        U.closeQuiet(conn);
-
-        final Collection<OrganizationKey> orgKeys = new ConcurrentLinkedQueue<>();
-        final Collection<PersonKey> prnKeys = new ConcurrentLinkedQueue<>();
-
-        IgniteBiInClosure<Object, Object> c = new CI2<Object, Object>() {
-            @Override public void apply(Object k, Object v) {
-                if (k instanceof OrganizationKey && v instanceof Organization)
-                    orgKeys.add((OrganizationKey)k);
-                else if (k instanceof PersonKey && v instanceof Person)
-                    prnKeys.add((PersonKey)k);
-            }
-        };
-
-        store.loadCache(c);
-
-        assertEquals(ORGANIZATION_CNT, orgKeys.size());
-        assertEquals(PERSON_CNT, prnKeys.size());
-
-        store.deleteAll(orgKeys);
-        store.deleteAll(prnKeys);
-
-        orgKeys.clear();
-        prnKeys.clear();
-
-        store.loadCache(c);
-
-        assertTrue(orgKeys.isEmpty());
-        assertTrue(prnKeys.isEmpty());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testStore() throws Exception {
-        // Create dummy transaction
-        IgniteTx tx = new DummyTx();
-
-        ses.newSession(tx);
-
-        OrganizationKey k1 = new OrganizationKey(1);
-        Organization v1 = new Organization(1, "Name1", "City1");
-
-        OrganizationKey k2 = new OrganizationKey(2);
-        Organization v2 = new Organization(2, "Name2", "City2");
-
-        store.write(new CacheEntryImpl<>(k1, v1));
-        store.write(new CacheEntryImpl<>(k2, v2));
-
-        store.txEnd(true);
-
-        ses.newSession(null);
-
-        assertEquals(v1, store.load(k1));
-        assertEquals(v2, store.load(k2));
-
-        ses.newSession(tx);
-
-        OrganizationKey k3 = new OrganizationKey(3);
-
-        assertNull(store.load(k3));
-
-        store.delete(k1);
-
-        store.txEnd(true);
-
-        assertNull(store.load(k1));
-        assertEquals(v2, store.load(k2));
-
-        ses.newSession(null);
-
-        assertNull(store.load(k3));
-    }
-
-    /**
-     * @throws IgniteCheckedException if failed.
-     */
-    public void testRollback() throws IgniteCheckedException {
-        IgniteTx tx = new DummyTx();
-
-        ses.newSession(tx);
-
-        OrganizationKey k1 = new OrganizationKey(1);
-        Organization v1 = new Organization(1, "Name1", "City1");
-
-        // Put.
-        store.write(new CacheEntryImpl<>(k1, v1));
-
-        store.txEnd(false); // Rollback.
-
-        tx = new DummyTx();
-
-        ses.newSession(tx);
-
-        assertNull(store.load(k1));
-
-        OrganizationKey k2 = new OrganizationKey(2);
-        Organization v2 = new Organization(2, "Name2", "City2");
-
-        // Put all.
-        assertNull(store.load(k2));
-
-        Collection<Cache.Entry<?, ?>> col = new ArrayList<>();
-
-        col.add(new CacheEntryImpl<>(k2, v2));
-
-        store.writeAll(col);
-
-        store.txEnd(false); // Rollback.
-
-        tx = new DummyTx();
-
-        ses.newSession(tx);
-
-        assertNull(store.load(k2));
-
-        OrganizationKey k3 = new OrganizationKey(3);
-        Organization v3 = new Organization(3, "Name3", "City3");
-
-        col = new ArrayList<>();
-
-        col.add(new CacheEntryImpl<>(k3, v3));
-
-        store.writeAll(col);
-
-        store.txEnd(true); // Commit.
-
-        tx = new DummyTx();
-
-        ses.newSession(tx);
-
-        assertEquals(v3, store.load(k3));
-
-        OrganizationKey k4 = new OrganizationKey(4);
-        Organization v4 = new Organization(4, "Name4", "City4");
-
-        store.write(new CacheEntryImpl<>(k4, v4));
-
-        store.txEnd(false); // Rollback.
-
-        tx = new DummyTx();
-
-        ses.newSession(tx);
-
-        assertNull(store.load(k4));
-
-        assertEquals(v3, store.load(k3));
-
-        // Remove.
-        store.delete(k3);
-
-        store.txEnd(false); // Rollback.
-
-        tx = new DummyTx();
-
-        ses.newSession(tx);
-
-        assertEquals(v3, store.load(k3));
-
-        // Remove all.
-        store.deleteAll(Arrays.asList(k3));
-
-        store.txEnd(false); // Rollback.
-
-        tx = new DummyTx();
-
-        ses.newSession(tx);
-
-        assertEquals(v3, store.load(k3));
-    }
-
-    /**
-     */
-    public void testAllOpsWithTXNoCommit() {
-        doTestAllOps(new DummyTx(), false);
-    }
-
-    /**
-     */
-    public void testAllOpsWithTXCommit() {
-        doTestAllOps(new DummyTx(), true);
-    }
-
-    /**
-     */
-    public void testAllOpsWithoutTX() {
-        doTestAllOps(null, false);
-    }
-
-    /**
-     * @param tx Transaction.
-     * @param commit Commit.
-     */
-    private void doTestAllOps(@Nullable IgniteTx tx, boolean commit) {
-        try {
-            ses.newSession(tx);
-
-            final OrganizationKey k1 = new OrganizationKey(1);
-            final Organization v1 = new Organization(1, "Name1", "City1");
-
-            store.write(new CacheEntryImpl<>(k1, v1));
-
-            if (tx != null && commit) {
-                store.txEnd(true);
-
-                tx = new DummyTx();
-
-                ses.newSession(tx);
-            }
-
-            if (tx == null || commit)
-                assertEquals(v1, store.load(k1));
-
-            Collection<Cache.Entry<?, ?>> col = new ArrayList<>();
-
-            final OrganizationKey k2 = new OrganizationKey(2);
-            final Organization v2 = new Organization(2, "Name2", "City2");
-
-            final OrganizationKey k3 = new OrganizationKey(3);
-            final Organization v3 = new Organization(3, "Name3", "City3");
-
-            col.add(new CacheEntryImpl<>(k2, v2));
-            col.add(new CacheEntryImpl<>(k3, v3));
-
-            store.writeAll(col);
-
-            if (tx != null && commit) {
-                store.txEnd(true);
-
-                tx = new DummyTx();
-
-                ses.newSession(tx);
-            }
-
-            final AtomicInteger cntr = new AtomicInteger();
-
-            final OrganizationKey no_such_key = new OrganizationKey(4);
-
-            if (tx == null || commit) {
-                Map<Object, Object> loaded = store.loadAll(Arrays.asList(k1, k2, k3, no_such_key));
-
-                for (Map.Entry<Object, Object> e : loaded.entrySet()) {
-                    Object key = e.getKey();
-                    Object val = e.getValue();
-
-                    if (k1.equals(key))
-                        assertEquals(v1, val);
-
-                    if (k2.equals(key))
-                        assertEquals(v2, val);
-
-                    if (k3.equals(key))
-                        assertEquals(v3, val);
-
-                    if (no_such_key.equals(key))
-                        fail();
-
-                    cntr.incrementAndGet();
-                }
-
-                assertEquals(3, cntr.get());
-            }
-
-            store.deleteAll(Arrays.asList(k2, k3));
-
-            if (tx != null && commit) {
-                store.txEnd(true);
-
-                tx = new DummyTx();
-
-                ses.newSession(tx);
-            }
-
-            if (tx == null || commit) {
-                assertNull(store.load(k2));
-                assertNull(store.load(k3));
-                assertEquals(v1, store.load(k1));
-            }
-
-            store.delete(k1);
-
-            if (tx != null && commit) {
-                store.txEnd(true);
-
-                tx = new DummyTx();
-
-                ses.newSession(tx);
-            }
-
-            if (tx == null || commit)
-                assertNull(store.load(k1));
-        }
-        finally {
-            if (tx != null)
-                store.txEnd(false);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSimpleMultithreading() throws Exception {
-        final Random rnd = new Random();
-
-        final Queue<OrganizationKey> queue = new LinkedBlockingQueue<>();
-
-        multithreaded(new Callable<Object>() {
-            @Nullable @Override public Object call() throws Exception {
-                for (int i = 0; i < 1000; i++) {
-                    IgniteTx tx = rnd.nextBoolean() ? new DummyTx() : null;
-
-                    ses.newSession(tx);
-
-                    int op = rnd.nextInt(10);
-
-                    boolean queueEmpty = false;
-
-                    if (op < 4) { // Load.
-                        OrganizationKey key = queue.poll();
-
-                        if (key == null)
-                            queueEmpty = true;
-                        else {
-                            if (rnd.nextBoolean())
-                                assertNotNull(store.load(key));
-                            else {
-                                Map<Object, Object> loaded = store.loadAll(Collections.singleton(key));
-
-                                assertEquals(1, loaded.size());
-
-                                Map.Entry<Object, Object> e = loaded.entrySet().iterator().next();
-
-                                OrganizationKey k = (OrganizationKey)e.getKey();
-                                Organization v = (Organization)e.getValue();
-
-                                assertTrue(k.getId().equals(v.getId()));
-                            }
-
-                            if (tx != null)
-                                store.txEnd(true);
-
-                            queue.add(key);
-                        }
-                    }
-                    else if (op < 6) { // Remove.
-                        OrganizationKey key = queue.poll();
-
-                        if (key == null)
-                            queueEmpty = true;
-                        else {
-                            if (rnd.nextBoolean())
-                                store.delete(key);
-                            else
-                                store.deleteAll(Collections.singleton(key));
-
-                            if (tx != null)
-                                store.txEnd(true);
-                        }
-                    }
-                    else { // Update.
-                        OrganizationKey key = queue.poll();
-
-                        if (key == null)
-                            queueEmpty = true;
-                        else {
-                            Organization val =
-                                new Organization(key.getId(), "Name" + key.getId(), "City" + key.getId());
-
-                            Cache.Entry<OrganizationKey, Organization> entry = new CacheEntryImpl<>(key, val);
-
-                            if (rnd.nextBoolean())
-                                store.write(entry);
-                            else {
-                                Collection<Cache.Entry<?, ?>> col = new ArrayList<>();
-
-                                col.add(entry);
-
-                                store.writeAll(col);
-                            }
-
-                            if (tx != null)
-                                store.txEnd(true);
-
-                            queue.add(key);
-                        }
-                    }
-
-                    if (queueEmpty) { // Add.
-                        OrganizationKey key = new OrganizationKey(rnd.nextInt());
-                        Organization val = new Organization(key.getId(), "Name" + key.getId(), "City" + key.getId());
-
-                        Cache.Entry<OrganizationKey, Organization> entry = new CacheEntryImpl<>(key, val);
-
-                        if (rnd.nextBoolean())
-                            store.write(entry);
-                        else {
-                            Collection<Cache.Entry<?, ?>> col = new ArrayList<>();
-
-                            col.add(entry);
-
-                            store.writeAll(col);
-                        }
-
-                        if (tx != null)
-                            store.txEnd(true);
-
-                        queue.add(key);
-                    }
-                }
-
-                return null;
-            }
-        }, 37);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        super.beforeTest();
-
-        Class.forName("org.h2.Driver");
-        Connection conn = DriverManager.getConnection(DFLT_CONN_URL, "sa", "");
-
-        Statement stmt = conn.createStatement();
-
-        stmt.executeUpdate("DROP TABLE IF EXISTS Organization");
-        stmt.executeUpdate("DROP TABLE IF EXISTS Person");
-
-        stmt.executeUpdate("CREATE TABLE Organization (id integer PRIMARY KEY, name varchar(50), city varchar(50))");
-        stmt.executeUpdate("CREATE TABLE Person (id integer PRIMARY KEY, org_id integer, name varchar(50))");
-
-        stmt.executeUpdate("CREATE INDEX Org_Name_IDX On Organization (name)");
-        stmt.executeUpdate("CREATE INDEX Org_Name_City_IDX On Organization (name, city)");
-        stmt.executeUpdate("CREATE INDEX Person_Name_IDX1 On Person (name)");
-        stmt.executeUpdate("CREATE INDEX Person_Name_IDX2 On Person (name desc)");
-
-        conn.commit();
-
-        U.closeQuiet(stmt);
-
-        U.closeQuiet(conn);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fbf76c78/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoJdbcCacheStoreMultitreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoJdbcCacheStoreMultitreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoJdbcCacheStoreMultitreadedSelfTest.java
new file mode 100644
index 0000000..b146566
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoJdbcCacheStoreMultitreadedSelfTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import org.h2.jdbcx.*;
+
+/**
+ *
+ */
+public class PojoJdbcCacheStoreMultitreadedSelfTest extends AbstractJdbcCacheStoreMultithreadedSelfTest<JdbcPojoCacheStore> {
+    /** {@inheritDoc} */
+    @Override protected JdbcPojoCacheStore store() throws Exception {
+        JdbcPojoCacheStore store = new JdbcPojoCacheStore();
+
+        store.setDataSource(JdbcConnectionPool.create(DFLT_CONN_URL, "sa", ""));
+
+        return store;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fbf76c78/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoJdbcCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoJdbcCacheStoreTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoJdbcCacheStoreTest.java
new file mode 100644
index 0000000..736f5eb
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoJdbcCacheStoreTest.java
@@ -0,0 +1,630 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.jdbc.dialect.*;
+import org.apache.ignite.cache.store.jdbc.model.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.cache.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+import org.h2.jdbcx.*;
+import org.jetbrains.annotations.*;
+import org.springframework.beans.*;
+import org.springframework.beans.factory.xml.*;
+import org.springframework.context.support.*;
+import org.springframework.core.io.*;
+
+import javax.cache.*;
+import java.io.*;
+import java.net.*;
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.testframework.junits.cache.GridAbstractCacheStoreSelfTest.*;
+
+/**
+ * Class for {@code PojoCacheStore} tests.
+ */
+public class PojoJdbcCacheStoreTest extends GridCommonAbstractTest {
+    /** Default connection URL (value is <tt>jdbc:h2:mem:jdbcCacheStore;DB_CLOSE_DELAY=-1</tt>). */
+    protected static final String DFLT_CONN_URL = "jdbc:h2:mem:autoCacheStore;DB_CLOSE_DELAY=-1";
+
+    /** Default config with mapping. */
+    protected static final String DFLT_MAPPING_CONFIG = "modules/core/src/test/config/store/jdbc/Ignite.xml";
+
+    /** Organization count. */
+    protected static final int ORGANIZATION_CNT = 1000;
+
+    /** Person count. */
+    protected static final int PERSON_CNT = 100000;
+
+    /** */
+    protected TestThreadLocalCacheSession ses = new TestThreadLocalCacheSession();
+
+    /** */
+    protected final JdbcPojoCacheStore store;
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"AbstractMethodCallInConstructor", "OverriddenMethodCallDuringObjectConstruction"})
+    public PojoJdbcCacheStoreTest() throws Exception {
+        super(false);
+
+        store = store();
+
+        inject(store);
+    }
+
+    /**
+     * @return Store.
+     */
+    protected JdbcPojoCacheStore store() throws IgniteCheckedException {
+        JdbcPojoCacheStore store = new JdbcPojoCacheStore();
+
+        store.setDataSource(JdbcConnectionPool.create(DFLT_CONN_URL, "sa", ""));
+
+        return store;
+    }
+
+    /**
+     * @param store Store.
+     * @throws Exception If failed.
+     */
+    protected void inject(JdbcCacheStore store) throws Exception {
+        getTestResources().inject(store);
+
+        GridTestUtils.setFieldValue(store, CacheStore.class, "ses", ses);
+
+        UrlResource metaUrl;
+
+        try {
+            metaUrl = new UrlResource(new File(DFLT_MAPPING_CONFIG).toURI().toURL());
+        }
+        catch (MalformedURLException e) {
+            throw new IgniteCheckedException("Failed to resolve metadata path [err=" + e.getMessage() + ']', e);
+        }
+
+        try {
+            GenericApplicationContext springCtx = new GenericApplicationContext();
+
+            new XmlBeanDefinitionReader(springCtx).loadBeanDefinitions(metaUrl);
+
+            springCtx.refresh();
+
+            Collection<CacheQueryTypeMetadata> typeMetadata =
+                springCtx.getBeansOfType(CacheQueryTypeMetadata.class).values();
+
+            Map<Integer, Map<Object, JdbcCacheStore.EntryMapping>> cacheMappings = new ConcurrentHashMap<>();
+
+            JdbcDialect dialect = store.resolveDialect();
+
+            GridTestUtils.setFieldValue(store, JdbcCacheStore.class, "dialect", dialect);
+
+            Map<Object, JdbcCacheStore.EntryMapping> entryMappings = U.newHashMap(typeMetadata.size());
+
+            for (CacheQueryTypeMetadata type : typeMetadata)
+                entryMappings.put(store.keyId(type.getKeyType()), new JdbcCacheStore.EntryMapping(dialect, type));
+
+            store.buildTypeCache(typeMetadata);
+
+            cacheMappings.put(0, Collections.unmodifiableMap(entryMappings));
+
+            GridTestUtils.setFieldValue(store, JdbcCacheStore.class, "cacheMappings", cacheMappings);
+        }
+        catch (BeansException e) {
+            if (X.hasCause(e, ClassNotFoundException.class))
+                throw new IgniteCheckedException("Failed to instantiate Spring XML application context " +
+                    "(make sure all classes used in Spring configuration are present at CLASSPATH) " +
+                    "[springUrl=" + metaUrl + ']', e);
+            else
+                throw new IgniteCheckedException("Failed to instantiate Spring XML application context [springUrl=" +
+                    metaUrl + ", err=" + e.getMessage() + ']', e);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLoadCache() throws Exception {
+        Connection conn = DriverManager.getConnection(DFLT_CONN_URL, "sa", "");
+
+        Statement stmt = conn.createStatement();
+
+        PreparedStatement orgStmt = conn.prepareStatement("INSERT INTO Organization(id, name, city) VALUES (?, ?, ?)");
+
+        for (int i = 0; i < ORGANIZATION_CNT; i++) {
+            orgStmt.setInt(1, i);
+            orgStmt.setString(2, "name" + i);
+            orgStmt.setString(3, "city" + i % 10);
+
+            orgStmt.addBatch();
+        }
+
+        orgStmt.executeBatch();
+
+        conn.commit();
+
+        PreparedStatement prnStmt = conn.prepareStatement("INSERT INTO Person(id, org_id, name) VALUES (?, ?, ?)");
+
+        for (int i = 0; i < PERSON_CNT; i++) {
+            prnStmt.setInt(1, i);
+            prnStmt.setInt(2, i % 100);
+            prnStmt.setString(3, "name" + i);
+
+            prnStmt.addBatch();
+        }
+
+        prnStmt.executeBatch();
+
+        conn.commit();
+
+        U.closeQuiet(stmt);
+
+        U.closeQuiet(conn);
+
+        final Collection<OrganizationKey> orgKeys = new ConcurrentLinkedQueue<>();
+        final Collection<PersonKey> prnKeys = new ConcurrentLinkedQueue<>();
+
+        IgniteBiInClosure<Object, Object> c = new CI2<Object, Object>() {
+            @Override public void apply(Object k, Object v) {
+                if (k instanceof OrganizationKey && v instanceof Organization)
+                    orgKeys.add((OrganizationKey)k);
+                else if (k instanceof PersonKey && v instanceof Person)
+                    prnKeys.add((PersonKey)k);
+            }
+        };
+
+        store.loadCache(c);
+
+        assertEquals(ORGANIZATION_CNT, orgKeys.size());
+        assertEquals(PERSON_CNT, prnKeys.size());
+
+        store.deleteAll(orgKeys);
+        store.deleteAll(prnKeys);
+
+        orgKeys.clear();
+        prnKeys.clear();
+
+        store.loadCache(c);
+
+        assertTrue(orgKeys.isEmpty());
+        assertTrue(prnKeys.isEmpty());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStore() throws Exception {
+        // Create dummy transaction
+        IgniteTx tx = new DummyTx();
+
+        ses.newSession(tx);
+
+        OrganizationKey k1 = new OrganizationKey(1);
+        Organization v1 = new Organization(1, "Name1", "City1");
+
+        OrganizationKey k2 = new OrganizationKey(2);
+        Organization v2 = new Organization(2, "Name2", "City2");
+
+        store.write(new CacheEntryImpl<>(k1, v1));
+        store.write(new CacheEntryImpl<>(k2, v2));
+
+        store.txEnd(true);
+
+        ses.newSession(null);
+
+        assertEquals(v1, store.load(k1));
+        assertEquals(v2, store.load(k2));
+
+        ses.newSession(tx);
+
+        OrganizationKey k3 = new OrganizationKey(3);
+
+        assertNull(store.load(k3));
+
+        store.delete(k1);
+
+        store.txEnd(true);
+
+        assertNull(store.load(k1));
+        assertEquals(v2, store.load(k2));
+
+        ses.newSession(null);
+
+        assertNull(store.load(k3));
+    }
+
+    /**
+     * @throws IgniteCheckedException if failed.
+     */
+    public void testRollback() throws IgniteCheckedException {
+        IgniteTx tx = new DummyTx();
+
+        ses.newSession(tx);
+
+        OrganizationKey k1 = new OrganizationKey(1);
+        Organization v1 = new Organization(1, "Name1", "City1");
+
+        // Put.
+        store.write(new CacheEntryImpl<>(k1, v1));
+
+        store.txEnd(false); // Rollback.
+
+        tx = new DummyTx();
+
+        ses.newSession(tx);
+
+        assertNull(store.load(k1));
+
+        OrganizationKey k2 = new OrganizationKey(2);
+        Organization v2 = new Organization(2, "Name2", "City2");
+
+        // Put all.
+        assertNull(store.load(k2));
+
+        Collection<Cache.Entry<?, ?>> col = new ArrayList<>();
+
+        col.add(new CacheEntryImpl<>(k2, v2));
+
+        store.writeAll(col);
+
+        store.txEnd(false); // Rollback.
+
+        tx = new DummyTx();
+
+        ses.newSession(tx);
+
+        assertNull(store.load(k2));
+
+        OrganizationKey k3 = new OrganizationKey(3);
+        Organization v3 = new Organization(3, "Name3", "City3");
+
+        col = new ArrayList<>();
+
+        col.add(new CacheEntryImpl<>(k3, v3));
+
+        store.writeAll(col);
+
+        store.txEnd(true); // Commit.
+
+        tx = new DummyTx();
+
+        ses.newSession(tx);
+
+        assertEquals(v3, store.load(k3));
+
+        OrganizationKey k4 = new OrganizationKey(4);
+        Organization v4 = new Organization(4, "Name4", "City4");
+
+        store.write(new CacheEntryImpl<>(k4, v4));
+
+        store.txEnd(false); // Rollback.
+
+        tx = new DummyTx();
+
+        ses.newSession(tx);
+
+        assertNull(store.load(k4));
+
+        assertEquals(v3, store.load(k3));
+
+        // Remove.
+        store.delete(k3);
+
+        store.txEnd(false); // Rollback.
+
+        tx = new DummyTx();
+
+        ses.newSession(tx);
+
+        assertEquals(v3, store.load(k3));
+
+        // Remove all.
+        store.deleteAll(Arrays.asList(k3));
+
+        store.txEnd(false); // Rollback.
+
+        tx = new DummyTx();
+
+        ses.newSession(tx);
+
+        assertEquals(v3, store.load(k3));
+    }
+
+    /**
+     */
+    public void testAllOpsWithTXNoCommit() {
+        doTestAllOps(new DummyTx(), false);
+    }
+
+    /**
+     */
+    public void testAllOpsWithTXCommit() {
+        doTestAllOps(new DummyTx(), true);
+    }
+
+    /**
+     */
+    public void testAllOpsWithoutTX() {
+        doTestAllOps(null, false);
+    }
+
+    /**
+     * @param tx Transaction.
+     * @param commit Commit.
+     */
+    private void doTestAllOps(@Nullable IgniteTx tx, boolean commit) {
+        try {
+            ses.newSession(tx);
+
+            final OrganizationKey k1 = new OrganizationKey(1);
+            final Organization v1 = new Organization(1, "Name1", "City1");
+
+            store.write(new CacheEntryImpl<>(k1, v1));
+
+            if (tx != null && commit) {
+                store.txEnd(true);
+
+                tx = new DummyTx();
+
+                ses.newSession(tx);
+            }
+
+            if (tx == null || commit)
+                assertEquals(v1, store.load(k1));
+
+            Collection<Cache.Entry<?, ?>> col = new ArrayList<>();
+
+            final OrganizationKey k2 = new OrganizationKey(2);
+            final Organization v2 = new Organization(2, "Name2", "City2");
+
+            final OrganizationKey k3 = new OrganizationKey(3);
+            final Organization v3 = new Organization(3, "Name3", "City3");
+
+            col.add(new CacheEntryImpl<>(k2, v2));
+            col.add(new CacheEntryImpl<>(k3, v3));
+
+            store.writeAll(col);
+
+            if (tx != null && commit) {
+                store.txEnd(true);
+
+                tx = new DummyTx();
+
+                ses.newSession(tx);
+            }
+
+            final AtomicInteger cntr = new AtomicInteger();
+
+            final OrganizationKey no_such_key = new OrganizationKey(4);
+
+            if (tx == null || commit) {
+                Map<Object, Object> loaded = store.loadAll(Arrays.asList(k1, k2, k3, no_such_key));
+
+                for (Map.Entry<Object, Object> e : loaded.entrySet()) {
+                    Object key = e.getKey();
+                    Object val = e.getValue();
+
+                    if (k1.equals(key))
+                        assertEquals(v1, val);
+
+                    if (k2.equals(key))
+                        assertEquals(v2, val);
+
+                    if (k3.equals(key))
+                        assertEquals(v3, val);
+
+                    if (no_such_key.equals(key))
+                        fail();
+
+                    cntr.incrementAndGet();
+                }
+
+                assertEquals(3, cntr.get());
+            }
+
+            store.deleteAll(Arrays.asList(k2, k3));
+
+            if (tx != null && commit) {
+                store.txEnd(true);
+
+                tx = new DummyTx();
+
+                ses.newSession(tx);
+            }
+
+            if (tx == null || commit) {
+                assertNull(store.load(k2));
+                assertNull(store.load(k3));
+                assertEquals(v1, store.load(k1));
+            }
+
+            store.delete(k1);
+
+            if (tx != null && commit) {
+                store.txEnd(true);
+
+                tx = new DummyTx();
+
+                ses.newSession(tx);
+            }
+
+            if (tx == null || commit)
+                assertNull(store.load(k1));
+        }
+        finally {
+            if (tx != null)
+                store.txEnd(false);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSimpleMultithreading() throws Exception {
+        final Random rnd = new Random();
+
+        final Queue<OrganizationKey> queue = new LinkedBlockingQueue<>();
+
+        multithreaded(new Callable<Object>() {
+            @Nullable @Override public Object call() throws Exception {
+                for (int i = 0; i < 1000; i++) {
+                    IgniteTx tx = rnd.nextBoolean() ? new DummyTx() : null;
+
+                    ses.newSession(tx);
+
+                    int op = rnd.nextInt(10);
+
+                    boolean queueEmpty = false;
+
+                    if (op < 4) { // Load.
+                        OrganizationKey key = queue.poll();
+
+                        if (key == null)
+                            queueEmpty = true;
+                        else {
+                            if (rnd.nextBoolean())
+                                assertNotNull(store.load(key));
+                            else {
+                                Map<Object, Object> loaded = store.loadAll(Collections.singleton(key));
+
+                                assertEquals(1, loaded.size());
+
+                                Map.Entry<Object, Object> e = loaded.entrySet().iterator().next();
+
+                                OrganizationKey k = (OrganizationKey)e.getKey();
+                                Organization v = (Organization)e.getValue();
+
+                                assertTrue(k.getId().equals(v.getId()));
+                            }
+
+                            if (tx != null)
+                                store.txEnd(true);
+
+                            queue.add(key);
+                        }
+                    }
+                    else if (op < 6) { // Remove.
+                        OrganizationKey key = queue.poll();
+
+                        if (key == null)
+                            queueEmpty = true;
+                        else {
+                            if (rnd.nextBoolean())
+                                store.delete(key);
+                            else
+                                store.deleteAll(Collections.singleton(key));
+
+                            if (tx != null)
+                                store.txEnd(true);
+                        }
+                    }
+                    else { // Update.
+                        OrganizationKey key = queue.poll();
+
+                        if (key == null)
+                            queueEmpty = true;
+                        else {
+                            Organization val =
+                                new Organization(key.getId(), "Name" + key.getId(), "City" + key.getId());
+
+                            Cache.Entry<OrganizationKey, Organization> entry = new CacheEntryImpl<>(key, val);
+
+                            if (rnd.nextBoolean())
+                                store.write(entry);
+                            else {
+                                Collection<Cache.Entry<?, ?>> col = new ArrayList<>();
+
+                                col.add(entry);
+
+                                store.writeAll(col);
+                            }
+
+                            if (tx != null)
+                                store.txEnd(true);
+
+                            queue.add(key);
+                        }
+                    }
+
+                    if (queueEmpty) { // Add.
+                        OrganizationKey key = new OrganizationKey(rnd.nextInt());
+                        Organization val = new Organization(key.getId(), "Name" + key.getId(), "City" + key.getId());
+
+                        Cache.Entry<OrganizationKey, Organization> entry = new CacheEntryImpl<>(key, val);
+
+                        if (rnd.nextBoolean())
+                            store.write(entry);
+                        else {
+                            Collection<Cache.Entry<?, ?>> col = new ArrayList<>();
+
+                            col.add(entry);
+
+                            store.writeAll(col);
+                        }
+
+                        if (tx != null)
+                            store.txEnd(true);
+
+                        queue.add(key);
+                    }
+                }
+
+                return null;
+            }
+        }, 37);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        Class.forName("org.h2.Driver");
+        Connection conn = DriverManager.getConnection(DFLT_CONN_URL, "sa", "");
+
+        Statement stmt = conn.createStatement();
+
+        stmt.executeUpdate("DROP TABLE IF EXISTS Organization");
+        stmt.executeUpdate("DROP TABLE IF EXISTS Person");
+
+        stmt.executeUpdate("CREATE TABLE Organization (id integer PRIMARY KEY, name varchar(50), city varchar(50))");
+        stmt.executeUpdate("CREATE TABLE Person (id integer PRIMARY KEY, org_id integer, name varchar(50))");
+
+        stmt.executeUpdate("CREATE INDEX Org_Name_IDX On Organization (name)");
+        stmt.executeUpdate("CREATE INDEX Org_Name_City_IDX On Organization (name, city)");
+        stmt.executeUpdate("CREATE INDEX Person_Name_IDX1 On Person (name)");
+        stmt.executeUpdate("CREATE INDEX Person_Name_IDX2 On Person (name desc)");
+
+        conn.commit();
+
+        U.closeQuiet(stmt);
+
+        U.closeQuiet(conn);
+    }
+}


Mime
View raw message