Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5637918DD8 for ; Tue, 27 Oct 2015 12:03:49 +0000 (UTC) Received: (qmail 19664 invoked by uid 500); 27 Oct 2015 12:03:43 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 19625 invoked by uid 500); 27 Oct 2015 12:03:43 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 19616 invoked by uid 99); 27 Oct 2015 12:03:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Oct 2015 12:03:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EE41EDFF81; Tue, 27 Oct 2015 12:03:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: akuznetsov@apache.org To: commits@ignite.apache.org Date: Tue, 27 Oct 2015 12:03:43 -0000 Message-Id: In-Reply-To: <5247604e0a944f9dad2b5dac87612e04@git.apache.org> References: <5247604e0a944f9dad2b5dac87612e04@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] ignite git commit: IGNITE-1753 Debug slow portables. IGNITE-1753 Debug slow portables. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8d325a4c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8d325a4c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8d325a4c Branch: refs/heads/ignite-1753-1282 Commit: 8d325a4c4b7b76d1f1402c45d785cb9ffbfce6c1 Parents: 92da142 Author: Alexey Kuznetsov Authored: Tue Oct 27 19:03:35 2015 +0700 Committer: Alexey Kuznetsov Committed: Tue Oct 27 19:03:35 2015 +0700 ---------------------------------------------------------------------- .../store/jdbc/CacheAbstractJdbcStore.java | 47 ++--- .../cache/store/jdbc/CacheJdbcPojoStore.java | 62 +++--- .../store/jdbc/CacheJdbcPojoStoreSelfTest.java | 64 ++++++ .../jdbc/CacheJdbcPortableStoreSelfTest.java | 93 +++++++++ .../jdbc/CacheJdbcStoreAbstractSelfTest.java | 204 +++++++++++++++++++ 5 files changed, 423 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8d325a4c/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java index bd04fe7..8b3d44a 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java @@ -669,7 +669,7 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, throw new CacheLoaderException("Expected even number of arguments, but found: " + args.length); if (log.isDebugEnabled()) - log.debug("Start loading entries from db using user queries from arguments"); + log.debug("Start loading entries from db using user queries from arguments..."); for (int i = 0; i < args.length; i += 2) { String keyType = args[i].toString(); @@ -686,8 +686,9 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, for (EntryMapping em : entryMappings) { if (parallelLoadCacheMinThreshold > 0) { - log.debug("Multithread loading entries from db [cache name=" + cacheName + - ", key type=" + em.keyType() + " ]"); + if (log.isDebugEnabled()) + log.debug("Multithread loading entries from db [cache=" + cacheName + + ", keyType=" + em.keyType() + " ]"); Connection conn = null; @@ -735,8 +736,8 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, } else { if (log.isDebugEnabled()) - log.debug("Single thread loading entries from db [cache name=" + cacheName + - ", key type=" + em.keyType() + " ]"); + log.debug("Single thread loading entries from db [cache=" + cacheName + + ", keyType=" + em.keyType() + " ]"); futs.add(pool.submit(loadCacheFull(em, clo))); } @@ -1014,8 +1015,8 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) { if (mergeStmt != null) { if (log.isDebugEnabled()) - log.debug("Write entries to db [cache name=" + cacheName + - ", key type=" + em.keyType() + ", count=" + prepared + "]"); + log.debug("Write entries to db [cache=" + cacheName + + ", keyType=" + em.keyType() + ", cnt=" + prepared + "]"); executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries); @@ -1039,8 +1040,8 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, if (++prepared % batchSz == 0) { if (log.isDebugEnabled()) - log.debug("Write entries to db [cache name=" + cacheName + - ", key type=" + em.keyType() + ", count=" + prepared + "]"); + log.debug("Write entries to db [cache=" + cacheName + + ", keyType=" + em.keyType() + ", cnt=" + prepared + "]"); executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries); @@ -1052,8 +1053,8 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, if (mergeStmt != null && prepared % batchSz != 0) { if (log.isDebugEnabled()) - log.debug("Write entries to db [cache name=" + cacheName + - ", key type=" + em.keyType() + ", count=" + prepared + "]"); + log.debug("Write entries to db [cache=" + cacheName + + ", keyType=" + em.keyType() + ", cnt=" + prepared + "]"); executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries); @@ -1064,8 +1065,9 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, } } else { - log.debug("Write entries to db one by one using update and insert statements [cache name=" + - cacheName + ", count=" + entries.size() + "]"); + if (log.isDebugEnabled()) + log.debug("Write entries to db one by one using update and insert statements [cache=" + cacheName + + ", cnt=" + entries.size() + "]"); PreparedStatement insStmt = null; @@ -1229,8 +1231,8 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, if (!currKeyTypeId.equals(keyTypeId)) { if (log.isDebugEnabled()) - log.debug("Delete entries from db [cache name=" + cacheName + - ", key type=" + em.keyType() + ", count=" + prepared + "]"); + log.debug("Delete entries from db [cache=" + cacheName + ", keyType=" + em.keyType() + + ", cnt=" + prepared + "]"); executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys); @@ -1247,8 +1249,8 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, if (++prepared % batchSz == 0) { if (log.isDebugEnabled()) - log.debug("Delete entries from db [cache name=" + cacheName + - ", key type=" + em.keyType() + ", count=" + prepared + "]"); + log.debug("Delete entries from db [cache=" + cacheName + ", keyType=" + em.keyType() + + ", cnt=" + prepared + "]"); executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys); @@ -1260,8 +1262,8 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, if (delStmt != null && prepared % batchSz != 0) { if (log.isDebugEnabled()) - log.debug("Delete entries from db [cache name=" + cacheName + - ", key type=" + em.keyType() + ", count=" + prepared + "]"); + log.debug("Delete entries from db [cache=" + cacheName + ", keyType=" + em.keyType() + + ", cnt=" + prepared + "]"); executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys); } @@ -1691,8 +1693,8 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, /** {@inheritDoc} */ @Override public Void call() throws Exception { if (log.isDebugEnabled()) - log.debug("Load cache using custom query [cache name= " + em.cacheName + - ", key type=" + em.keyType() + ", query=" + qry + "]"); + log.debug("Load cache using custom query [cache= " + em.cacheName + ", keyType=" + em.keyType() + + ", query=" + qry + "]"); Connection conn = null; @@ -1787,8 +1789,7 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, /** {@inheritDoc} */ @Override public Map call() throws Exception { if (log.isDebugEnabled()) - log.debug("Load values from db [table= " + em.fullTableName() + - ", key count=" + keys.size() + "]"); + log.debug("Load values from db [table= " + em.fullTableName() + ", keysCnt=" + keys.size() + "]"); PreparedStatement stmt = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/8d325a4c/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java index fa718be..f8abd16 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java @@ -113,6 +113,9 @@ public class CacheJdbcPojoStore implements CacheStore, LifecycleAwar /** Data source. */ private DataSource dataSrc; + /** Portables. */ + private IgnitePortables portables; + /** Cache with entry mapping description. (cache name, (key id, mapping description)). */ private volatile Map> cacheMappings = Collections.emptyMap(); @@ -141,7 +144,7 @@ public class CacheJdbcPojoStore implements CacheStore, LifecycleAwar private volatile Map> pojoMethods = Collections.emptyMap(); /** Portables builders cache. */ - private volatile Map> portableBuilders = Collections.emptyMap(); + private volatile Map> portableTypeIds = Collections.emptyMap(); /** * Checks for POJO/portable format. @@ -275,12 +278,12 @@ public class CacheJdbcPojoStore implements CacheStore, LifecycleAwar CacheJdbcPojoStoreTypeField[] fields, Map loadColIdxs, ResultSet rs) throws CacheLoaderException { - Map z = pojoMethods.get(cacheName); + Map cacheMethods = pojoMethods.get(cacheName); - if (z == null) + if (cacheMethods == null) throw new CacheLoaderException("Failed to find POJO types metadata for cache: " + cacheName); - PojoMethodsCache mc = z.get(typeName); + PojoMethodsCache mc = cacheMethods.get(typeName); if (mc == null) throw new CacheLoaderException("Failed to find POJO type metadata for type: " + typeName); @@ -339,12 +342,17 @@ public class CacheJdbcPojoStore implements CacheStore, LifecycleAwar */ protected PortableObject buildPortableObject(String cacheName, String typeName, CacheJdbcPojoStoreTypeField[] fields, Map loadColIdxs, ResultSet rs) throws CacheException { - Map cachePortables = portableBuilders.get(cacheName); + Map cacheTypeIds = portableTypeIds.get(cacheName); + + if (cacheTypeIds == null) + throw new CacheLoaderException("Failed to find portable types IDs for cache: " + cacheName); + + Integer typeId = cacheTypeIds.get(typeName); - if (cachePortables == null) - throw new CacheException("Failed to find portable builders for cache: " + cacheName); + if (typeId == null) + throw new CacheLoaderException("Failed to find portable type ID for type: " + typeName); - PortableBuilder builder = cachePortables.get(typeName); + PortableBuilder builder = portables.builder(typeId); if (builder == null) throw new CacheException("Failed to find portable builder for type: " + typeName); @@ -389,7 +397,7 @@ public class CacheJdbcPojoStore implements CacheStore, LifecycleAwar */ private Object typeIdForTypeName(boolean keepSerialized, String typeName) throws CacheException { if (keepSerialized) - return ignite().portables().typeId(typeName); + return portables.typeId(typeName); try { return Class.forName(typeName); @@ -444,7 +452,7 @@ public class CacheJdbcPojoStore implements CacheStore, LifecycleAwar */ private void preparePortableBuilders(@Nullable String cacheName, Collection types) throws CacheException { - Map typeBuilders = U.newHashMap(types.size() * 2); + Map typeIds = U.newHashMap(types.size() * 2); for (CacheJdbcPojoStoreType type : types) { if (type.isKeepSerialized()) { @@ -453,21 +461,19 @@ public class CacheJdbcPojoStore implements CacheStore, LifecycleAwar IgnitePortables portables = ignite.portables(); String keyType = type.getKeyType(); - int keyTypeId = portables.typeId(keyType); - typeBuilders.put(keyType, portables.builder(keyTypeId)); + typeIds.put(keyType, portables.typeId(keyType)); String valType = type.getValueType(); - int valTypeId = portables.typeId(valType); - typeBuilders.put(valType, portables.builder(valTypeId)); + typeIds.put(valType, portables.typeId(valType)); } } - if (!typeBuilders.isEmpty()) { - Map> newBuilders = new HashMap<>(portableBuilders); + if (!typeIds.isEmpty()) { + Map> newBuilders = new HashMap<>(portableTypeIds); - newBuilders.put(cacheName, typeBuilders); + newBuilders.put(cacheName, typeIds); - portableBuilders = newBuilders; + portableTypeIds = newBuilders; } } @@ -525,6 +531,8 @@ public class CacheJdbcPojoStore implements CacheStore, LifecycleAwar if (log.isDebugEnabled() && dialect.getClass() != BasicJdbcDialect.class) log.debug("Resolved database dialect: " + U.getSimpleName(dialect.getClass())); } + + portables = ignite.portables(); } /** {@inheritDoc} */ @@ -741,6 +749,8 @@ public class CacheJdbcPojoStore implements CacheStore, LifecycleAwar ResultSet rs = stmt.executeQuery(); + long t = System.currentTimeMillis(); + while (rs.next()) { K key = buildObject(em.cacheName, em.keyType(), em.keyColumns(), em.loadColIdxs, rs); V val = buildObject(em.cacheName, em.valueType(), em.valueColumns(), em.loadColIdxs, rs); @@ -927,8 +937,10 @@ public class CacheJdbcPojoStore implements CacheStore, LifecycleAwar throw new CacheException("Key type must be unique in type metadata [cache name=" + cacheName + ", key type=" + keyType + "]"); - checkMapping(cacheName, keyType, type.getKeyFields()); - checkMapping(cacheName, valType, type.getValueFields()); + if (!keepSerialized) { + checkMapping(cacheName, keyType, type.getKeyFields()); + checkMapping(cacheName, valType, type.getValueFields()); + } entryMappings.put(keyTypeId, new EntryMapping(cacheName, dialect, type)); } @@ -1009,8 +1021,9 @@ public class CacheJdbcPojoStore implements CacheStore, LifecycleAwar for (EntryMapping em : entryMappings) { if (parallelLoadCacheMinThreshold > 0) { - log.debug("Multithread loading entries from db [cache name=" + cacheName + - ", key type=" + em.keyType() + " ]"); + if (log.isDebugEnabled()) + log.debug("Multithread loading entries from db [cache name=" + cacheName + + ", key type=" + em.keyType() + " ]"); Connection conn = null; @@ -1387,8 +1400,9 @@ public class CacheJdbcPojoStore implements CacheStore, LifecycleAwar } } else { - log.debug("Write entries to db one by one using update and insert statements [cache name=" + - cacheName + ", count=" + entries.size() + "]"); + if (log.isDebugEnabled()) + log.debug("Write entries to db one by one using update and insert statements [cache name=" + + cacheName + ", count=" + entries.size() + "]"); PreparedStatement insStmt = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/8d325a4c/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreSelfTest.java new file mode 100644 index 0000000..9f50507 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreSelfTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc; + +import java.sql.Types; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.marshaller.optimized.OptimizedMarshaller; +import org.apache.ignite.marshaller.portable.PortableMarshaller; + +/** + * Class for {@code PojoCacheStore} tests. + */ +public class CacheJdbcPojoStoreSelfTest extends CacheJdbcStoreAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected Marshaller marshaller(){ + OptimizedMarshaller marsh = new OptimizedMarshaller(); + + return marsh; + } + + /** {@inheritDoc} */ + @Override protected CacheJdbcPojoStoreType[] storeTypes() { + CacheJdbcPojoStoreType[] storeTypes = new CacheJdbcPojoStoreType[2]; + + storeTypes[0] = new CacheJdbcPojoStoreType(); + storeTypes[0].setDatabaseSchema("PUBLIC"); + storeTypes[0].setDatabaseTable("ORGANIZATION"); + storeTypes[0].setKeyType("org.apache.ignite.cache.store.jdbc.model.OrganizationKey"); + storeTypes[0].setKeyFields(new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id")); + storeTypes[0].setValueType("org.apache.ignite.cache.store.jdbc.model.Organization"); + storeTypes[0].setValueFields( + new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id"), + new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "NAME", String.class, "name"), + new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "CITY", String.class, "city")); + + storeTypes[1] = new CacheJdbcPojoStoreType(); + storeTypes[1].setDatabaseSchema("PUBLIC"); + storeTypes[1].setDatabaseTable("PERSON"); + storeTypes[1].setKeyType("org.apache.ignite.cache.store.jdbc.model.PersonKey"); + storeTypes[1].setKeyFields(new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id")); + storeTypes[1].setValueType("org.apache.ignite.cache.store.jdbc.model.Person"); + storeTypes[1].setValueFields( + new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id"), + new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ORG_ID", Integer.class, "orgId"), + new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "NAME", String.class, "name")); + + return storeTypes; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8d325a4c/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPortableStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPortableStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPortableStoreSelfTest.java new file mode 100644 index 0000000..4b533b2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPortableStoreSelfTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Collection; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.marshaller.portable.PortableMarshaller; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.h2.jdbcx.JdbcConnectionPool; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; + +/** + * Class for {@code PojoCacheStore} tests. + */ +public class CacheJdbcPortableStoreSelfTest extends CacheJdbcStoreAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected Marshaller marshaller(){ + PortableMarshaller marsh = new PortableMarshaller(); + + Collection clsNames = new ArrayList<>(); + clsNames.add("org.apache.ignite.cache.store.jdbc.model.OrganizationKey"); + clsNames.add("org.apache.ignite.cache.store.jdbc.model.Organization"); + clsNames.add("org.apache.ignite.cache.store.jdbc.model.PersonKey"); + clsNames.add("org.apache.ignite.cache.store.jdbc.model.Person"); + + marsh.setClassNames(clsNames); + + return marsh; + } + + /** {@inheritDoc} */ + @Override protected CacheJdbcPojoStoreType[] storeTypes() { + CacheJdbcPojoStoreType[] storeTypes = new CacheJdbcPojoStoreType[2]; + + storeTypes[0] = new CacheJdbcPojoStoreType(); + storeTypes[0].setKeepSerialized(true); + storeTypes[0].setDatabaseSchema("PUBLIC"); + storeTypes[0].setDatabaseTable("ORGANIZATION"); + storeTypes[0].setKeyType("org.apache.ignite.cache.store.jdbc.model.OrganizationKey"); + storeTypes[0].setKeyFields(new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id")); + storeTypes[0].setValueType("org.apache.ignite.cache.store.jdbc.model.Organization"); + storeTypes[0].setValueFields( + new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id"), + new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "NAME", String.class, "name"), + new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "CITY", String.class, "city")); + + storeTypes[1] = new CacheJdbcPojoStoreType(); + storeTypes[1].setKeepSerialized(true); + storeTypes[1].setDatabaseSchema("PUBLIC"); + storeTypes[1].setDatabaseTable("PERSON"); + storeTypes[1].setKeyType("org.apache.ignite.cache.store.jdbc.model.PersonKey"); + storeTypes[1].setKeyFields(new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id")); + storeTypes[1].setValueType("org.apache.ignite.cache.store.jdbc.model.Person"); + storeTypes[1].setValueFields( + new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id"), + new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ORG_ID", Integer.class, "orgId"), + new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "NAME", String.class, "name")); + + return storeTypes; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8d325a4c/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractSelfTest.java new file mode 100644 index 0000000..c083e93 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractSelfTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.ConnectorConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.marshaller.portable.PortableMarshaller; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.h2.jdbcx.JdbcConnectionPool; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; + +/** + * Class for {@code PojoCacheStore} tests. + */ +public abstract class CacheJdbcStoreAbstractSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + protected static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** DB connection URL. */ + protected static final String DFLT_CONN_URL = "jdbc:h2:mem:TestDatabase;DB_CLOSE_DELAY=-1"; + + /** Organization count. */ + protected static final int ORGANIZATION_CNT = 1000; + + /** Person count. */ + protected static final int PERSON_CNT = 100000; + + /** + * @return Connection to test in-memory H2 database. + * @throws SQLException + */ + protected Connection getConnection() throws SQLException { + return DriverManager.getConnection(DFLT_CONN_URL, "sa", ""); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + Connection conn = getConnection(); + + Statement stmt = conn.createStatement(); + + stmt.executeUpdate("DROP TABLE IF EXISTS Organization"); + stmt.executeUpdate("DROP TABLE IF EXISTS Person"); + + stmt.executeUpdate("CREATE TABLE Organization (id integer PRIMARY KEY, name varchar(50), city varchar(50))"); + stmt.executeUpdate("CREATE TABLE Person (id integer PRIMARY KEY, org_id integer, name varchar(50))"); + + conn.commit(); + + U.closeQuiet(stmt); + + U.closeQuiet(conn); + + startGrid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + cfg.setCacheConfiguration(cacheConfiguration()); + + cfg.setMarshaller(marshaller()); + + ConnectorConfiguration connCfg = new ConnectorConfiguration(); + cfg.setConnectorConfiguration(connCfg); + + return cfg; + } + + protected abstract Marshaller marshaller(); + + /** */ + protected CacheJdbcPojoStoreConfiguration storeConfiguration() { + CacheJdbcPojoStoreConfiguration storeCfg = new CacheJdbcPojoStoreConfiguration(); + + storeCfg.setDialect(new H2Dialect()); + + storeCfg.setTypes(storeTypes()); + + return storeCfg; + } + + protected abstract CacheJdbcPojoStoreType[] storeTypes(); + + /** */ + protected CacheConfiguration cacheConfiguration() throws Exception { + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(PARTITIONED); + cc.setAtomicityMode(ATOMIC); + cc.setSwapEnabled(false); + cc.setWriteBehindEnabled(false); + cc.setNearConfiguration(null); + + CacheJdbcPojoStoreFactory storeFactory = new CacheJdbcPojoStoreFactory(); + storeFactory.setConfiguration(storeConfiguration()); + storeFactory.setDataSource(JdbcConnectionPool.create(DFLT_CONN_URL, "sa", "")); // H2 DataSource + + cc.setCacheStoreFactory(storeFactory); + cc.setReadThrough(true); + cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); + + return cc; + } + + protected void fillSampleDatabase() throws SQLException { + Connection conn = getConnection(); + + PreparedStatement orgStmt = conn.prepareStatement("INSERT INTO Organization(id, name, city) VALUES (?, ?, ?)"); + + for (int i = 0; i < ORGANIZATION_CNT; i++) { + orgStmt.setInt(1, i); + orgStmt.setString(2, "name" + i); + orgStmt.setString(3, "city" + i % 10); + + orgStmt.addBatch(); + } + + orgStmt.executeBatch(); + + U.closeQuiet(orgStmt); + + conn.commit(); + + PreparedStatement prnStmt = conn.prepareStatement("INSERT INTO Person(id, org_id, name) VALUES (?, ?, ?)"); + + for (int i = 0; i < PERSON_CNT; i++) { + prnStmt.setInt(1, i); + prnStmt.setInt(2, i % 100); + prnStmt.setString(3, "name" + i); + + prnStmt.addBatch(); + } + + prnStmt.executeBatch(); + + conn.commit(); + + U.closeQuiet(prnStmt); + + U.closeQuiet(conn); + } + + /** + * @throws Exception If failed. + */ + public void testLoadCache() throws Exception { + fillSampleDatabase(); + + IgniteCache c1 = grid().cache(null); + + info("Cache load started..."); + + c1.loadCache(null); + + info("Cache load finished!"); + + Thread.sleep(1000000000); + } +}