Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 510F017641 for ; Tue, 13 Jan 2015 06:26:17 +0000 (UTC) Received: (qmail 88646 invoked by uid 500); 13 Jan 2015 06:26:19 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 88616 invoked by uid 500); 13 Jan 2015 06:26:18 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 88607 invoked by uid 99); 13 Jan 2015 06:26:18 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Jan 2015 06:26:18 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 13 Jan 2015 06:25:55 +0000 Received: (qmail 88473 invoked by uid 99); 13 Jan 2015 06:25:52 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Jan 2015 06:25:52 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 3DEF0A05CB2; Tue, 13 Jan 2015 06:25:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Message-Id: <2c90be872faa46e7846f276419b43bfd@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-ignite git commit: # IGNITE-32 WIP: Store implementation batch size and pool size. Date: Tue, 13 Jan 2015 06:25:52 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-ignite Updated Branches: refs/heads/ignite-32 fcfe5019d -> eaeca2e5a # IGNITE-32 WIP: Store implementation batch size and pool size. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/eaeca2e5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/eaeca2e5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/eaeca2e5 Branch: refs/heads/ignite-32 Commit: eaeca2e5ae95c4f7bc9ef5fffe33ca7cee081057 Parents: fcfe501 Author: AKuznetsov Authored: Tue Jan 13 13:26:16 2015 +0700 Committer: AKuznetsov Committed: Tue Jan 13 13:26:16 2015 +0700 ---------------------------------------------------------------------- .../grid/cache/store/auto/AutoCacheStore.java | 320 ++++++++++++------- 1 file changed, 200 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eaeca2e5/modules/core/src/main/java/org/gridgain/grid/cache/store/auto/AutoCacheStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/store/auto/AutoCacheStore.java b/modules/core/src/main/java/org/gridgain/grid/cache/store/auto/AutoCacheStore.java index ff7974b..ee56152 100644 --- a/modules/core/src/main/java/org/gridgain/grid/cache/store/auto/AutoCacheStore.java +++ b/modules/core/src/main/java/org/gridgain/grid/cache/store/auto/AutoCacheStore.java @@ -54,8 +54,8 @@ public abstract class AutoCacheStore implements GridCacheStore { /** Remove item(s) query. */ protected final String remQry; - /** Batch size for load query. */ - protected final int loadBatchSize; + /** Max key count for load query per statement. */ + protected final int maxKeysPerStmt; /** Database table name. */ private final String tblName; @@ -70,10 +70,10 @@ public abstract class AutoCacheStore implements GridCacheStore { private final Set uniqCols; /** Mapper for key. */ - protected JdbcMapper keyMapper; + protected final JdbcMapper keyMapper; /** Mapper for value. */ - protected JdbcMapper valMapper; + protected final JdbcMapper valMapper; /** * @@ -96,9 +96,9 @@ public abstract class AutoCacheStore implements GridCacheStore { loadQrySingle = loadQuery(tblName, keyCols, valCols, 1); - loadBatchSize = MAX_QRY_PARAMETERS / keyCols.size(); + maxKeysPerStmt = maxParamsCnt / keyCols.size(); - loadQry = loadQuery(tblName, keyCols, uniqCols, loadBatchSize); + loadQry = loadQuery(tblName, keyCols, uniqCols, maxKeysPerStmt); putQry = putQuery(tblName, keyCols, uniqCols); @@ -110,13 +110,13 @@ public abstract class AutoCacheStore implements GridCacheStore { } /** - * Construct query for select values with key count less or equal {@code loadBatchSize} + * Construct query for select values with key count less or equal {@code maxKeysPerStmt} * @param keyCnt Key count. */ protected String loadQueryLast(int keyCnt) { - assert keyCnt >= loadBatchSize; + assert keyCnt >= maxKeysPerStmt; - if (keyCnt == loadBatchSize) + if (keyCnt == maxKeysPerStmt) return loadQry; if (keyCnt == 1) @@ -126,8 +126,11 @@ public abstract class AutoCacheStore implements GridCacheStore { } } - /** Max query parameters count. */ - protected static final int MAX_QRY_PARAMETERS = 2000; + /** Default max query parameters count. */ + protected static final int DFLT_MAX_PARAMS_CNT = 2000; + + /** Default batch size for put and remove operations. */ + protected static final int DFLT_BATCH_SIZE = 512; /** Connection attribute name. */ protected static final String ATTR_CONN = "JDBC_STORE_CONNECTION"; @@ -165,7 +168,7 @@ public abstract class AutoCacheStore implements GridCacheStore { protected String passwd; /** Execute. */ - protected ExecutorService exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + protected ExecutorService exec; /** Paths to xml with type mapping description. */ protected Collection typeMetadataPaths; @@ -176,6 +179,15 @@ public abstract class AutoCacheStore implements GridCacheStore { /** Type cache. */ protected Map typesCache; + /** Max workers thread count. These threads are responsible for execute query. */ + protected int maxPoolSz = Runtime.getRuntime().availableProcessors(); + + /** Max query parameters count. */ + protected int maxParamsCnt = DFLT_MAX_PARAMS_CNT; + + /** Maximum batch size for put and remove operations. */ + protected int batchSz = DFLT_BATCH_SIZE; + /** * Initializes store. * @@ -219,6 +231,8 @@ public abstract class AutoCacheStore implements GridCacheStore { setTypeMetadata(typeMeta); } + exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + buildTypeCache(); initOk = true; @@ -422,87 +436,6 @@ public abstract class AutoCacheStore implements GridCacheStore { } /** - * @return Data source. - */ - public DataSource getDataSource() { - return dataSrc; - } - - /** - * @param dataSrc Data source. - */ - public void setDataSource(DataSource dataSrc) { - this.dataSrc = dataSrc; - } - - /** - * @return Connection URL. - */ - public String getConnUrl() { - return connUrl; - } - - /** - * @param connUrl Connection URL. - */ - public void setConnUrl(String connUrl) { - this.connUrl = connUrl; - } - - /** - * @return Password for database access. - */ - public String getPassword() { - return passwd; - } - - /** - * @param passwd Password for database access. - */ - public void setPassword(String passwd) { - this.passwd = passwd; - } - - /** - * @return User name for database access. - */ - public String getUser() { - return user; - } - - /** - * @param user User name for database access. - */ - public void setUser(String user) { - this.user = user; - } - - /** - * @return Paths to xml with type mapping description. - */ - public Collection getTypeMetadataPaths() { - return typeMetadataPaths; - } - - /** - * Set paths to xml with type mapping description. - * - * @param typeMetadataPaths Paths to xml. - */ - public void setTypeMetadataPaths(Collection typeMetadataPaths) { - this.typeMetadataPaths = typeMetadataPaths; - } - - /** - * Set type mapping description. - * - * @param typeMetadata Type mapping description. - */ - public void setTypeMetadata(Collection typeMetadata) { - this.typeMetadata = typeMetadata; - } - - /** * Construct load cache query. * * @param tblName Database table name. @@ -525,7 +458,7 @@ public abstract class AutoCacheStore implements GridCacheStore { protected String loadQuery(String tblName, Collection keyCols, Iterable valCols, int keyCnt) { assert !keyCols.isEmpty(); - assert keyCols.size() * keyCnt <= MAX_QRY_PARAMETERS; + assert keyCols.size() * keyCnt <= maxParamsCnt; SB sb = new SB(String.format("SELECT %s FROM %s WHERE ", mkString(valCols, ","), tblName)); @@ -606,37 +539,37 @@ public abstract class AutoCacheStore implements GridCacheStore { for (final TypeCache type : typesCache.values()) futs.add(exec.submit(new Callable() { @Override public Void call() throws Exception { - Connection conn = null; + Connection conn = null; - try { - PreparedStatement stmt = null; + try { + PreparedStatement stmt = null; - try { - conn = connection(null); + try { + conn = connection(null); - stmt = conn.prepareStatement(type.loadCacheQry); + stmt = conn.prepareStatement(type.loadCacheQry); - ResultSet rs = stmt.executeQuery(); + ResultSet rs = stmt.executeQuery(); - while (rs.next()) { - K key = type.keyMapper.readObject(ignite, rs); - V val = type.valMapper.readObject(ignite, rs); + while (rs.next()) { + K key = type.keyMapper.readObject(ignite, rs); + V val = type.valMapper.readObject(ignite, rs); - clo.apply(key, val); - } - } - catch (SQLException e) { - throw new IgniteCheckedException("Failed to load cache", e); - } - finally { - U.closeQuiet(stmt); + clo.apply(key, val); } } + catch (SQLException e) { + throw new IgniteCheckedException("Failed to load cache", e); + } finally { - closeConnection(conn); + U.closeQuiet(stmt); } + } + finally { + closeConnection(conn); + } - return null; + return null; } })); @@ -734,7 +667,7 @@ public abstract class AutoCacheStore implements GridCacheStore { } /** {@inheritDoc} */ - @Override public final void loadAll(@Nullable final IgniteTx tx, Collection keys, + @Override public void loadAll(@Nullable final IgniteTx tx, Collection keys, final IgniteBiInClosure c) throws IgniteCheckedException { Map> splittedKeys = U.newHashMap(typesCache.size()); @@ -751,7 +684,7 @@ public abstract class AutoCacheStore implements GridCacheStore { batch.add(key); - if (batch.size() == typesCache.get(typeKey).loadBatchSize) { + if (batch.size() == typesCache.get(typeKey).maxKeysPerStmt) { final Collection p = splittedKeys.remove(typeKey); futs.add(exec.submit(new Callable() { @@ -840,15 +773,21 @@ public abstract class AutoCacheStore implements GridCacheStore { stmt = conn.prepareStatement(type.putQry); + int cnt = 0; + for (Map.Entry entry : map) { int startIdx = type.keyMapper.setParameters(stmt, 1, entry.getKey()); type.valMapper.setParameters(stmt, startIdx, entry.getValue()); stmt.addBatch(); + + if (cnt++ % batchSz == 0) + stmt.executeBatch(); } - stmt.executeBatch(); + if (cnt % batchSz != 0) + stmt.executeBatch(); } catch (SQLException e) { throw new IgniteCheckedException("Failed to put objects", e); @@ -859,7 +798,7 @@ public abstract class AutoCacheStore implements GridCacheStore { } /** {@inheritDoc} */ - @Override public final void putAll(@Nullable final IgniteTx tx, Map map) + @Override public void putAll(@Nullable final IgniteTx tx, Map map) throws IgniteCheckedException { Map>> keyByType = U.newHashMap(typesCache.size()); @@ -953,13 +892,19 @@ public abstract class AutoCacheStore implements GridCacheStore { stmt = conn.prepareStatement(type.remQry); + int cnt = 0; + for (K key : keys) { type.keyMapper.setParameters(stmt, 1, key); stmt.addBatch(); + + if (cnt++ % batchSz == 0) + stmt.executeBatch(); } - stmt.executeBatch(); + if (cnt % batchSz != 0) + stmt.executeBatch(); } catch (SQLException e) { throw new IgniteCheckedException("Failed to remove values by keys.", e); @@ -970,7 +915,7 @@ public abstract class AutoCacheStore implements GridCacheStore { } /** {@inheritDoc} */ - @Override public final void removeAll(@Nullable IgniteTx tx, Collection keys) + @Override public void removeAll(@Nullable IgniteTx tx, Collection keys) throws IgniteCheckedException { Map> keyByType = U.newHashMap(typesCache.size()); @@ -988,4 +933,139 @@ public abstract class AutoCacheStore implements GridCacheStore { for (Map.Entry> entry : keyByType.entrySet()) removeAll(tx, entry.getKey(), entry.getValue()); } + + /** + * @return Data source. + */ + public DataSource getDataSource() { + return dataSrc; + } + + /** + * @param dataSrc Data source. + */ + public void setDataSource(DataSource dataSrc) { + this.dataSrc = dataSrc; + } + + /** + * @return Connection URL. + */ + public String getConnUrl() { + return connUrl; + } + + /** + * @param connUrl Connection URL. + */ + public void setConnUrl(String connUrl) { + this.connUrl = connUrl; + } + + /** + * @return Password for database access. + */ + public String getPassword() { + return passwd; + } + + /** + * @param passwd Password for database access. + */ + public void setPassword(String passwd) { + this.passwd = passwd; + } + + /** + * @return User name for database access. + */ + public String getUser() { + return user; + } + + /** + * @param user User name for database access. + */ + public void setUser(String user) { + this.user = user; + } + + /** + * @return Paths to xml with type mapping description. + */ + public Collection getTypeMetadataPaths() { + return typeMetadataPaths; + } + + /** + * Set paths to xml with type mapping description. + * + * @param typeMetadataPaths Paths to xml. + */ + public void setTypeMetadataPaths(Collection typeMetadataPaths) { + this.typeMetadataPaths = typeMetadataPaths; + } + + /** + * Set type mapping description. + * + * @param typeMetadata Type mapping description. + */ + public void setTypeMetadata(Collection typeMetadata) { + this.typeMetadata = typeMetadata; + } + + /** + * Get Max workers thread count. These threads are responsible for execute query. + * + * @return Max workers thread count. + */ + public int getMaxPoolSize() { + return maxPoolSz; + } + + /** + * Set Max workers thread count. These threads are responsible for execute query. + * + * @param maxPoolSz Max workers thread count. + */ + public void setMaxPoolSize(int maxPoolSz) { + this.maxPoolSz = maxPoolSz; + } + + /** + * Get max query parameters count. + * + * @return Max query parameters count. + */ + public int getMaxParamsCnt() { + return maxParamsCnt; + } + + /** + * Set max query parameters count. + * + * @param maxParamsCnt Max query parameters count. + */ + public void setMaxParamsCnt(int maxParamsCnt) { + this.maxParamsCnt = maxParamsCnt; + } + + /** + * Get maximum batch size for put and remove operations. + * + * @return Maximum batch size. + */ + public int getBatchSize() { + return batchSz; + } + + /** + * Set maximum batch size for put and remove operations. + * + * @param batchSz Maximum batch size. + */ + public void setBatchSize(int batchSz) { + this.batchSz = batchSz; + } }