ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [11/30] ignite git commit: IGNTIE-4220: Support statements for JDBC and Cassandra store. This closes #1898.
Date Thu, 11 May 2017 09:56:22 GMT
IGNTIE-4220: Support statements for JDBC and Cassandra store. This closes #1898.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8cd9fbe5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8cd9fbe5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8cd9fbe5

Branch: refs/heads/ignite-5075-cacheStart
Commit: 8cd9fbe59050b788ce9cec7f0f2b59bca36e7545
Parents: 988ca32
Author: Andrey V. Mashenkov <andrey.mashenkov@gmail.com>
Authored: Thu May 4 17:08:29 2017 +0300
Committer: Andrey V. Mashenkov <andrey.mashenkov@gmail.com>
Committed: Thu May 4 18:20:47 2017 +0300

----------------------------------------------------------------------
 .../store/cassandra/CassandraCacheStore.java    | 16 ++++-
 .../session/LoadCacheCustomQueryWorker.java     | 26 +++++--
 .../ignite/tests/IgnitePersistentStoreTest.java | 30 ++++++---
 .../tests/persistence/blob/ignite-config.xml    |  4 +-
 .../tests/persistence/pojo/ignite-config.xml    |  4 +-
 .../persistence/primitive/ignite-config.xml     |  4 +-
 .../primitive/ignite-remote-server-config.xml   |  4 +-
 .../store/jdbc/CacheAbstractJdbcStore.java      | 71 ++++++++++++++------
 .../CacheJdbcPojoStoreAbstractSelfTest.java     | 49 ++++++++++++++
 9 files changed, 163 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8cd9fbe5/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
index 70d798b..98c8b40 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
@@ -20,6 +20,7 @@ package org.apache.ignite.cache.store.cassandra;
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedList;
@@ -113,10 +114,19 @@ public class CassandraCacheStore<K, V> implements CacheStore<K,
V> {
             CassandraSession ses = getCassandraSession();
 
             for (Object obj : args) {
-                if (obj == null || !(obj instanceof String) || !((String)obj).trim().toLowerCase().startsWith("select"))
-                    continue;
+                LoadCacheCustomQueryWorker<K, V> task = null;
 
-                futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(ses, (String)
obj, controller, log, clo)));
+                if (obj instanceof Statement)
+                    task = new LoadCacheCustomQueryWorker<>(ses, (Statement)obj, controller,
log, clo);
+                else if (obj instanceof String) {
+                    String qry = ((String)obj).trim();
+
+                    if (qry.toLowerCase().startsWith("select"))
+                        task = new LoadCacheCustomQueryWorker<>(ses, (String) obj,
controller, log, clo);
+                }
+
+                if (task != null)
+                    futs.add(pool.submit(task));
             }
 
             for (Future<?> fut : futs)

http://git-wip-us.apache.org/repos/asf/ignite/blob/8cd9fbe5/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
index d3ace7d..d186b98 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
@@ -36,8 +36,8 @@ public class LoadCacheCustomQueryWorker<K, V> implements Callable<Void>
{
     /** Cassandra session to execute CQL query */
     private final CassandraSession ses;
 
-    /** User query. */
-    private final String qry;
+    /** Statement. */
+    private final Statement stmt;
 
     /** Persistence controller */
     private final PersistenceController ctrl;
@@ -49,12 +49,28 @@ public class LoadCacheCustomQueryWorker<K, V> implements Callable<Void>
{
     private final IgniteBiInClosure<K, V> clo;
 
     /**
+     * @param ses Session.
+     * @param qry Query.
+     * @param ctrl Control.
+     * @param log Logger.
      * @param clo Closure for loaded values.
      */
     public LoadCacheCustomQueryWorker(CassandraSession ses, String qry, PersistenceController
ctrl,
-        IgniteLogger log, IgniteBiInClosure<K, V> clo) {
+                                      IgniteLogger log, IgniteBiInClosure<K, V> clo)
{
+        this(ses, new SimpleStatement(qry.trim().endsWith(";") ? qry : qry + ';'), ctrl,
log, clo);
+    }
+
+    /**
+     * @param ses Session.
+     * @param stmt Statement.
+     * @param ctrl Control.
+     * @param log Logger.
+     * @param clo Closure for loaded values.
+     */
+    public LoadCacheCustomQueryWorker(CassandraSession ses, Statement stmt, PersistenceController
ctrl,
+                                      IgniteLogger log, IgniteBiInClosure<K, V> clo)
{
         this.ses = ses;
-        this.qry = qry.trim().endsWith(";") ? qry : qry + ";";
+        this.stmt = stmt;
         this.ctrl = ctrl;
         this.log = log;
         this.clo = clo;
@@ -70,7 +86,7 @@ public class LoadCacheCustomQueryWorker<K, V> implements Callable<Void>
{
 
             /** {@inheritDoc} */
             @Override public Statement getStatement() {
-                return new SimpleStatement(qry);
+                return stmt;
             }
 
             /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8cd9fbe5/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
index 97e7230..c8c7139 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.tests;
 
+import com.datastax.driver.core.SimpleStatement;
 import java.util.Collection;
 import java.util.Date;
 import java.util.Map;
@@ -43,6 +44,7 @@ import org.apache.log4j.Logger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.Assert;
 import org.springframework.core.io.ClassPathResource;
 
 /**
@@ -429,31 +431,39 @@ public class IgnitePersistentStoreTest {
         LOGGER.info("Running loadCache test");
 
         try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/pojo/ignite-config.xml"))
{
-            IgniteCache<PersonId, Person> personCache3 = ignite.getOrCreateCache(new
CacheConfiguration<PersonId, Person>("cache3"));
+            CacheConfiguration<PersonId, Person> ccfg = new CacheConfiguration<>("cache3");
+
+            IgniteCache<PersonId, Person> personCache3 = ignite.getOrCreateCache(ccfg);
+
             int size = personCache3.size(CachePeekMode.ALL);
 
             LOGGER.info("Initial cache size " + size);
 
             LOGGER.info("Loading cache data from Cassandra table");
 
-            personCache3.loadCache(null, new String[] {"select * from test1.pojo_test3 limit
3"});
+            String qry = "select * from test1.pojo_test3 limit 3";
+
+            personCache3.loadCache(null, qry);
 
             size = personCache3.size(CachePeekMode.ALL);
-            if (size != 3) {
-                throw new RuntimeException("Cache data was incorrectly loaded from Cassandra.
" +
-                    "Expected number of records is 3, but loaded number of records is " +
size);
-            }
+            Assert.assertEquals("Cache data was incorrectly loaded from Cassandra table by
'" + qry + "'", 3, size);
+
+            personCache3.clear();
+
+            personCache3.loadCache(null, new SimpleStatement(qry));
+
+            size = personCache3.size(CachePeekMode.ALL);
+            Assert.assertEquals("Cache data was incorrectly loaded from Cassandra table by
statement", 3, size);
 
             personCache3.clear();
 
             personCache3.loadCache(null);
 
             size = personCache3.size(CachePeekMode.ALL);
-            if (size != TestsHelper.getBulkOperationSize()) {
-                throw new RuntimeException("Cache data was incorrectly loaded from Cassandra.
" +
+            Assert.assertEquals("Cache data was incorrectly loaded from Cassandra. " +
                     "Expected number of records is " + TestsHelper.getBulkOperationSize()
+
-                    ", but loaded number of records is " + size);
-            }
+                    ", but loaded number of records is " + size,
+                TestsHelper.getBulkOperationSize(), size);
 
             LOGGER.info("Cache data loaded from Cassandra table");
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8cd9fbe5/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/blob/ignite-config.xml
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/blob/ignite-config.xml
b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/blob/ignite-config.xml
index fbf38e9..db360d5 100644
--- a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/blob/ignite-config.xml
+++ b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/blob/ignite-config.xml
@@ -78,8 +78,8 @@
                         to our documentation: http://apacheignite.readme.io/docs/cluster-config
                     -->
                     <!-- Uncomment static IP finder to enable static-based discovery of
initial nodes. -->
-                    <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
-                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
+                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+                    <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">-->
                         <property name="addresses">
                             <list>
                                 <!-- In distributed environment, replace with actual host
IP address. -->

http://git-wip-us.apache.org/repos/asf/ignite/blob/8cd9fbe5/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/ignite-config.xml
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/ignite-config.xml
b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/ignite-config.xml
index c9b45c8..af4ffef 100644
--- a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/ignite-config.xml
+++ b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/ignite-config.xml
@@ -153,8 +153,8 @@
                         to our documentation: http://apacheignite.readme.io/docs/cluster-config
                     -->
                     <!-- Uncomment static IP finder to enable static-based discovery of
initial nodes. -->
-                    <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
-                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
+                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+                    <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">-->
                         <property name="addresses">
                             <list>
                                 <!-- In distributed environment, replace with actual host
IP address. -->

http://git-wip-us.apache.org/repos/asf/ignite/blob/8cd9fbe5/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/primitive/ignite-config.xml
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/primitive/ignite-config.xml
b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/primitive/ignite-config.xml
index 13e0922..a7d101d 100644
--- a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/primitive/ignite-config.xml
+++ b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/primitive/ignite-config.xml
@@ -78,8 +78,8 @@
                         to our documentation: http://apacheignite.readme.io/docs/cluster-config
                     -->
                     <!-- Uncomment static IP finder to enable static-based discovery of
initial nodes. -->
-                    <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
-                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
+                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+                    <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">-->
                         <property name="addresses">
                             <list>
                                 <!-- In distributed environment, replace with actual host
IP address. -->

http://git-wip-us.apache.org/repos/asf/ignite/blob/8cd9fbe5/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/primitive/ignite-remote-server-config.xml
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/primitive/ignite-remote-server-config.xml
b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/primitive/ignite-remote-server-config.xml
index 8d71aec..bbaff8c 100644
--- a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/primitive/ignite-remote-server-config.xml
+++ b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/primitive/ignite-remote-server-config.xml
@@ -94,8 +94,8 @@
                         to our documentation: http://apacheignite.readme.io/docs/cluster-config
                     -->
                     <!-- Uncomment static IP finder to enable static-based discovery of
initial nodes. -->
-                    <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
-                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
+                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+                    <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">-->
                         <property name="addresses">
                             <list>
                                 <!-- In distributed environment, replace with actual host
IP address. -->

http://git-wip-us.apache.org/repos/asf/ignite/blob/8cd9fbe5/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index ba2a98d..46e9022 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -702,17 +702,34 @@ public abstract class CacheAbstractJdbcStore<K, V> implements
CacheStore<K, V>,
                         }
                     }))
                         throw new CacheLoaderException("Provided key type is not found in
store or cache configuration " +
-                            "[cache=" + U.maskName(cacheName) + ", key=" + keyType + "]");
-
-                    String qry = args[i + 1].toString();
+                            "[cache=" + U.maskName(cacheName) + ", key=" + keyType + ']');
 
                     EntryMapping em = entryMapping(cacheName, typeIdForTypeName(kindForName(keyType),
keyType));
 
-                    if (log.isInfoEnabled())
-                        log.info("Started load cache using custom query [cache=" + U.maskName(cacheName)
+
-                            ", keyType=" + keyType + ", query=" + qry + "]");
+                    Object arg = args[i + 1];
+
+                    LoadCacheCustomQueryWorker<K, V> task;
+
+                    if (arg instanceof PreparedStatement) {
+                        PreparedStatement stmt = (PreparedStatement)arg;
+
+                        if (log.isInfoEnabled())
+                            log.info("Started load cache using custom statement [cache="
+ U.maskName(cacheName) +
+                                      ", keyType=" + keyType + ", stmt=" + stmt + ']');
+
+                        task = new LoadCacheCustomQueryWorker<>(em, stmt, clo);
+                    }
+                    else {
+                        String qry = arg.toString();
 
-                    futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, qry,
clo)));
+                        if (log.isInfoEnabled())
+                              log.info("Started load cache using custom query [cache=" +
U.maskName(cacheName) +
+                                  ", keyType=" + keyType + ", query=" + qry + ']');
+
+                        task = new LoadCacheCustomQueryWorker<>(em, qry, clo);
+                    }
+
+                    futs.add(pool.submit(task));
                 }
             }
             else {
@@ -727,7 +744,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K,
V>,
                     processedKeyTypes.add(keyType);
 
                     if (log.isInfoEnabled())
-                        log.info("Started load cache [cache=" + U.maskName(cacheName) + ",
keyType=" + keyType + "]");
+                        log.info("Started load cache [cache=" + U.maskName(cacheName) + ",
keyType=" + keyType + ']');
 
                     if (parallelLoadCacheMinThreshold > 0) {
                         Connection conn = null;
@@ -744,7 +761,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K,
V>,
                             if (rs.next()) {
                                 if (log.isDebugEnabled())
                                     log.debug("Multithread loading entries from db [cache="
+ U.maskName(cacheName) +
-                                        ", keyType=" + keyType + "]");
+                                        ", keyType=" + keyType + ']');
 
                                 int keyCnt = em.keyCols.size();
 
@@ -773,7 +790,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K,
V>,
                         }
                         catch (SQLException e) {
                             log.warning("Failed to load entries from db in multithreaded
mode, will try in single thread " +
-                                "[cache=" + U.maskName(cacheName) + ", keyType=" + keyType
+ " ]", e);
+                                "[cache=" + U.maskName(cacheName) + ", keyType=" + keyType
+ ']', e);
                         }
                         finally {
                             U.closeQuiet(conn);
@@ -782,7 +799,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K,
V>,
 
                     if (log.isDebugEnabled())
                         log.debug("Single thread loading entries from db [cache=" + U.maskName(cacheName)
+
-                            ", keyType=" + keyType + "]");
+                            ", keyType=" + keyType + ']');
 
                     futs.add(pool.submit(loadCacheFull(em, clo)));
                 }
@@ -809,7 +826,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K,
V>,
         EntryMapping em = entryMapping(session().cacheName(), typeIdForObject(key));
 
         if (log.isDebugEnabled())
-            log.debug("Load value from db [table= " + em.fullTableName() + ", key=" + key
+ "]");
+            log.debug("Load value from db [table= " + em.fullTableName() + ", key=" + key
+ ']');
 
         Connection conn = null;
 
@@ -1910,14 +1927,28 @@ public abstract class CacheAbstractJdbcStore<K, V> implements
CacheStore<K, V>,
         /** Entry mapping description. */
         private final EntryMapping em;
 
+        /** User statement. */
+        private PreparedStatement stmt;
+
         /** User query. */
-        private final String qry;
+        private String qry;
 
         /** Closure for loaded values. */
         private final IgniteBiInClosure<K1, V1> clo;
 
         /**
          * @param em Entry mapping description.
+         * @param stmt User statement.
+         * @param clo Closure for loaded values.
+         */
+        private LoadCacheCustomQueryWorker(EntryMapping em, PreparedStatement stmt, IgniteBiInClosure<K1,
V1> clo) {
+            this.em = em;
+            this.stmt = stmt;
+            this.clo = clo;
+        }
+
+        /**
+         * @param em Entry mapping description.
          * @param qry User query.
          * @param clo Closure for loaded values.
          */
@@ -1931,12 +1962,12 @@ public abstract class CacheAbstractJdbcStore<K, V> implements
CacheStore<K, V>,
         @Override public Void call() throws Exception {
             Connection conn = null;
 
-            PreparedStatement stmt = null;
-
             try {
-                conn = openConnection(true);
+                if (stmt == null) {
+                    conn = openConnection(true);
 
-                stmt = conn.prepareStatement(qry);
+                    stmt = conn.prepareStatement(qry);
+                }
 
                 stmt.setFetchSize(dialect.getFetchSize());
 
@@ -1962,9 +1993,11 @@ public abstract class CacheAbstractJdbcStore<K, V> implements
CacheStore<K, V>,
                 throw new CacheLoaderException("Failed to execute custom query for load cache",
e);
             }
             finally {
-                U.closeQuiet(stmt);
+                if (conn != null) {
+                    U.closeQuiet(stmt);
 
-                U.closeQuiet(conn);
+                    U.closeQuiet(conn);
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8cd9fbe5/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
index 8544f71..703cbe1 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
@@ -337,6 +337,55 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
     }
 
     /**
+     * Checks that data was loaded correctly with prepared statement.
+     */
+    protected void checkCacheLoadWithStatement() throws SQLException {
+        Connection conn = null;
+
+        PreparedStatement stmt = null;
+
+        try {
+            conn = getConnection();
+
+            conn.setAutoCommit(true);
+
+            String qry = "select id, org_id, name, birthday, gender from Person";
+
+            stmt = conn.prepareStatement(qry);
+
+            IgniteCache<Object, Object> c1 = grid().cache(CACHE_NAME);
+
+            c1.loadCache(null, "org.apache.ignite.cache.store.jdbc.model.PersonKey", stmt);
+
+            assertEquals(PERSON_CNT, c1.size());
+        }
+        finally {
+            U.closeQuiet(stmt);
+
+            U.closeQuiet(conn);
+        }
+
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLoadCacheWithStatement() throws Exception {
+        startTestGrid(false, false, false, false, 512);
+
+        checkCacheLoadWithStatement();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLoadCacheWithStatementTx() throws Exception {
+        startTestGrid(false, false, false, true, 512);
+
+        checkCacheLoadWithStatement();
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testLoadCache() throws Exception {


Mime
View raw message