ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [1/3] ignite git commit: IGNITE-3708 Fixed multithreaded loading entries for MySql.
Date Mon, 22 Aug 2016 08:09:04 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 2d4360707 -> 05c5939ae


 IGNITE-3708 Fixed multithreaded loading entries for MySql.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d399db92
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d399db92
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d399db92

Branch: refs/heads/master
Commit: d399db92ab4e147a3933a42dd5635b225665ac63
Parents: 974467a
Author: Alexey Kuznetsov <akuznetsov@apache.org>
Authored: Mon Aug 22 15:00:06 2016 +0700
Committer: Alexey Kuznetsov <akuznetsov@apache.org>
Committed: Mon Aug 22 15:00:06 2016 +0700

----------------------------------------------------------------------
 .../store/jdbc/CacheAbstractJdbcStore.java      | 43 +++++++++++---------
 .../store/jdbc/dialect/BasicJdbcDialect.java    |  7 +++-
 .../cache/store/jdbc/dialect/JdbcDialect.java   | 11 ++++-
 .../cache/store/jdbc/dialect/MySQLDialect.java  | 18 +++++++-
 .../jdbc/CacheJdbcPojoStoreFactorySelfTest.java | 11 +++--
 5 files changed, 65 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d399db92/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index c16f2c6..aad05e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -495,10 +495,11 @@ public abstract class CacheAbstractJdbcStore<K, V> implements
CacheStore<K, V>,
      * @param clo Closure that will be applied to loaded values.
      * @param lowerBound Lower bound for range.
      * @param upperBound Upper bound for range.
+     * @param fetchSize Number of rows to fetch from DB.
      * @return Callable for pool submit.
      */
     private Callable<Void> loadCacheRange(final EntryMapping em, final IgniteBiInClosure<K,
V> clo,
-        @Nullable final Object[] lowerBound, @Nullable final Object[] upperBound) {
+        @Nullable final Object[] lowerBound, @Nullable final Object[] upperBound, final int
fetchSize) {
         return new Callable<Void>() {
             @Override public Void call() throws Exception {
                 Connection conn = null;
@@ -512,6 +513,8 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K,
V>,
                         ? em.loadCacheQry
                         : em.loadCacheRangeQuery(lowerBound != null, upperBound != null));
 
+                    stmt.setFetchSize(fetchSize);
+
                     int idx = 1;
 
                     if (lowerBound != null)
@@ -555,7 +558,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K,
V>,
      * @return Callable for pool submit.
      */
     private Callable<Void> loadCacheFull(EntryMapping m, IgniteBiInClosure<K, V>
clo) {
-        return loadCacheRange(m, clo, null, null);
+        return loadCacheRange(m, clo, null, null, dialect.getFetchSize());
     }
 
     /**
@@ -811,10 +814,6 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K,
V>,
 
                 for (EntryMapping em : entryMappings) {
                     if (parallelLoadCacheMinThreshold > 0) {
-                        if (log.isDebugEnabled())
-                            log.debug("Multithread loading entries from db [cache=" +  U.maskName(cacheName)
+
-                                ", keyType=" + em.keyType() + " ]");
-
                         Connection conn = null;
 
                         try {
@@ -827,6 +826,10 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K,
V>,
                             ResultSet rs = stmt.executeQuery();
 
                             if (rs.next()) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Multithread loading entries from db [cache="
+  U.maskName(cacheName) +
+                                        ", keyType=" + em.keyType() + " ]");
+
                                 int keyCnt = em.keyCols.size();
 
                                 Object[] upperBound = new Object[keyCnt];
@@ -834,7 +837,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K,
V>,
                                 for (int i = 0; i < keyCnt; i++)
                                     upperBound[i] = rs.getObject(i + 1);
 
-                                futs.add(pool.submit(loadCacheRange(em, clo, null, upperBound)));
+                                futs.add(pool.submit(loadCacheRange(em, clo, null, upperBound,
0)));
 
                                 while (rs.next()) {
                                     Object[] lowerBound = upperBound;
@@ -844,28 +847,28 @@ public abstract class CacheAbstractJdbcStore<K, V> implements
CacheStore<K, V>,
                                     for (int i = 0; i < keyCnt; i++)
                                         upperBound[i] = rs.getObject(i + 1);
 
-                                    futs.add(pool.submit(loadCacheRange(em, clo, lowerBound,
upperBound)));
+                                    futs.add(pool.submit(loadCacheRange(em, clo, lowerBound,
upperBound, 0)));
                                 }
 
-                                futs.add(pool.submit(loadCacheRange(em, clo, upperBound,
null)));
+                                futs.add(pool.submit(loadCacheRange(em, clo, upperBound,
null, 0)));
+
+                                continue;
                             }
-                            else
-                                futs.add(pool.submit(loadCacheFull(em, clo)));
                         }
-                        catch (SQLException ignored) {
-                            futs.add(pool.submit(loadCacheFull(em, clo)));
+                        catch (SQLException e) {
+                            log.warning("Failed to load entries from db in multithreaded
mode [cache=" +  U.maskName(cacheName) +
+                                ", keyType=" + em.keyType() + " ]", e);
                         }
                         finally {
                             U.closeQuiet(conn);
                         }
                     }
-                    else {
-                        if (log.isDebugEnabled())
-                            log.debug("Single thread loading entries from db [cache=" + 
U.maskName(cacheName) +
-                                ", keyType=" + em.keyType() + " ]");
 
-                        futs.add(pool.submit(loadCacheFull(em, clo)));
-                    }
+                    if (log.isDebugEnabled())
+                        log.debug("Single thread loading entries from db [cache=" +  U.maskName(cacheName)
+
+                            ", keyType=" + em.keyType() + " ]");
+
+                    futs.add(pool.submit(loadCacheFull(em, clo)));
                 }
             }
 
@@ -1926,6 +1929,8 @@ public abstract class CacheAbstractJdbcStore<K, V> implements
CacheStore<K, V>,
 
                 stmt = conn.prepareStatement(qry);
 
+                stmt.setFetchSize(dialect.getFetchSize());
+
                 ResultSet rs = stmt.executeQuery();
 
                 ResultSetMetaData meta = rs.getMetaData();

http://git-wip-us.apache.org/repos/asf/ignite/blob/d399db92/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
index abb59d3..cd9c986 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
@@ -274,4 +274,9 @@ public class BasicJdbcDialect implements JdbcDialect {
     public void setMaxParameterCount(int maxParamsCnt) {
         this.maxParamsCnt = maxParamsCnt;
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public int getFetchSize() {
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d399db92/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java
index 38e981f..9daa00b 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java
@@ -115,4 +115,13 @@ public interface JdbcDialect extends Serializable {
      * @return Max query parameters count.
      */
     public int getMaxParameterCount();
-}
\ No newline at end of file
+
+    /**
+     * Gives the JDBC driver a hint how many rows should be fetched from the database when
more rows are needed.
+     * If the value specified is zero, then the hint is ignored.
+     * The default value is zero.
+     *
+     * @return The fetch size for result sets.
+     */
+    public int getFetchSize();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d399db92/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
index f7512a7..84e6d05 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
@@ -29,6 +29,15 @@ public class MySQLDialect extends BasicJdbcDialect {
     private static final long serialVersionUID = 0L;
 
     /** {@inheritDoc} */
+    @Override public String loadCacheSelectRangeQuery(String fullTblName, Collection<String>
keyCols) {
+        String cols = mkString(keyCols, ",");
+
+        return String.format("SELECT %s " +
+            "FROM (SELECT %s, @rownum := @rownum + 1 AS rn FROM %s, (SELECT @rownum := 0)
r ORDER BY %s) as r " +
+            "WHERE mod(rn, ?) = 0", cols, cols, fullTblName, cols);
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean hasMerge() {
         return true;
     }
@@ -48,4 +57,11 @@ public class MySQLDialect extends BasicJdbcDialect {
         return String.format("INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s",
fullTblName,
             mkString(cols, ", "), repeat("?", cols.size(), "", ",", ""), updPart);
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public int getFetchSize() {
+        // Workaround for known issue with MySQL large result set.
+        // See: http://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html
+        return Integer.MIN_VALUE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d399db92/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java
b/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java
index 2f36017..dfa1452 100644
--- a/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java
+++ b/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java
@@ -129,9 +129,9 @@ public class CacheJdbcPojoStoreFactorySelfTest extends GridCommonAbstractTest
{
     }
 
     /**
-     *
+     * Dummy JDBC dialect that does nothing.
      */
-    public static class DummyDialect implements JdbcDialect, Serializable {
+    public static class DummyDialect implements JdbcDialect {
         /** {@inheritDoc} */
         @Override public String loadCacheSelectRangeQuery(String fullTblName, Collection<String>
keyCols) {
             return null;
@@ -185,5 +185,10 @@ public class CacheJdbcPojoStoreFactorySelfTest extends GridCommonAbstractTest
{
         @Override public int getMaxParameterCount() {
             return 0;
         }
+
+        /** {@inheritDoc} */
+        @Override public int getFetchSize() {
+            return 0;
+        }
     }
-}
\ No newline at end of file
+}


Mime
View raw message