Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0F3221853B for ; Mon, 23 Nov 2015 19:07:14 +0000 (UTC) Received: (qmail 43472 invoked by uid 500); 23 Nov 2015 19:07:12 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 43422 invoked by uid 500); 23 Nov 2015 19:07:12 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 42826 invoked by uid 99); 23 Nov 2015 19:07:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Nov 2015 19:07:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8BD1BDFFDA; Mon, 23 Nov 2015 19:07:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Mon, 23 Nov 2015 19:07:25 -0000 Message-Id: <3d2b67dd66a4418bb199c1f38316426c@git.apache.org> In-Reply-To: <558777b8adeb41a89dbaf1c6379bf097@git.apache.org> References: <558777b8adeb41a89dbaf1c6379bf097@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [16/27] ignite git commit: IGNITE-1753 Refactored usages of deprectaed CacheTypeMetadata to JdbcType. IGNITE-1753 Refactored usages of deprectaed CacheTypeMetadata to JdbcType. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d71f6129 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d71f6129 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d71f6129 Branch: refs/heads/ignite-single-op-get Commit: d71f6129bc737539e61206c391fc25c776f36242 Parents: 19d2dd0 Author: AKuznetsov Authored: Mon Nov 23 18:20:50 2015 +0700 Committer: AKuznetsov Committed: Mon Nov 23 18:20:50 2015 +0700 ---------------------------------------------------------------------- examples/schema-import/bin/db-init.sql | 3 +- .../org/apache/ignite/schema/CacheConfig.java | 7 +- .../java/org/apache/ignite/schema/Demo.java | 20 +- .../org/apache/ignite/cache/QueryIndex.java | 53 +- .../store/jdbc/CacheAbstractJdbcStore.java | 638 ++++++++++++------- .../store/jdbc/CacheJdbcBlobStoreFactory.java | 14 +- .../cache/store/jdbc/CacheJdbcPojoStore.java | 444 +++++++++---- .../store/jdbc/CacheJdbcPojoStoreFactory.java | 277 +++++++- .../ignite/cache/store/jdbc/JdbcType.java | 255 ++++++++ .../cache/store/jdbc/JdbcTypeDefaultHasher.java | 43 ++ .../ignite/cache/store/jdbc/JdbcTypeField.java | 172 +++++ .../ignite/cache/store/jdbc/JdbcTypeHasher.java | 34 + .../processors/query/GridQueryProcessor.java | 6 +- .../ignite/internal/visor/cache/VisorCache.java | 4 +- .../CacheJdbcPojoStoreAbstractSelfTest.java | 395 ++++++++++++ ...dbcPojoStoreOptimizedMarshallerSelfTest.java | 31 + ...JdbcPojoStorePortableMarshallerSelfTest.java | 85 +++ .../store/jdbc/CacheJdbcPojoStoreTest.java | 200 +++--- ...eJdbcStoreAbstractMultithreadedSelfTest.java | 2 +- .../ignite/testsuites/IgniteCacheTestSuite.java | 6 +- modules/schema-import/README.txt | 176 ++--- .../ignite/schema/generator/CodeGenerator.java | 198 +++--- .../ignite/schema/generator/XmlGenerator.java | 101 +-- .../apache/ignite/schema/model/IndexItem.java | 54 -- .../ignite/schema/model/PojoDescriptor.java | 72 +-- .../ignite/schema/model/SchemaDescriptor.java | 6 +- .../schema/parser/DatabaseMetadataParser.java | 12 +- .../apache/ignite/schema/parser/DbTable.java | 37 +- .../parser/dialect/DatabaseMetadataDialect.java | 32 +- .../parser/dialect/JdbcMetadataDialect.java | 22 +- .../parser/dialect/OracleMetadataDialect.java | 24 +- .../apache/ignite/schema/ui/ModalDialog.java | 6 +- .../ignite/schema/ui/SchemaImportApp.java | 13 +- .../schema/test/AbstractSchemaImportTest.java | 4 +- .../schema/test/model/ignite-type-metadata.xml | 610 +++++++++--------- .../yardstick/config/ignite-store-config.xml | 50 +- 36 files changed, 2844 insertions(+), 1262 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d71f6129/examples/schema-import/bin/db-init.sql ---------------------------------------------------------------------- diff --git a/examples/schema-import/bin/db-init.sql b/examples/schema-import/bin/db-init.sql index f02236a..8a91a6a 100644 --- a/examples/schema-import/bin/db-init.sql +++ b/examples/schema-import/bin/db-init.sql @@ -17,7 +17,8 @@ -- Script of database initialization for Schema Import Demo. drop table PERSON; -create table PERSON(id integer not null, first_name varchar(50), last_name varchar(50), salary double not null, PRIMARY KEY(id)); + +create table PERSON(id integer not null PRIMARY KEY, first_name varchar(50), last_name varchar(50), salary double not null); insert into PERSON(id, first_name, last_name, salary) values(1, 'Johannes', 'Kepler', 1000); insert into PERSON(id, first_name, last_name, salary) values(2, 'Galileo', 'Galilei', 2000); http://git-wip-us.apache.org/repos/asf/ignite/blob/d71f6129/examples/schema-import/src/main/java/org/apache/ignite/schema/CacheConfig.java ---------------------------------------------------------------------- diff --git a/examples/schema-import/src/main/java/org/apache/ignite/schema/CacheConfig.java b/examples/schema-import/src/main/java/org/apache/ignite/schema/CacheConfig.java index cb316c5..c5801cc 100644 --- a/examples/schema-import/src/main/java/org/apache/ignite/schema/CacheConfig.java +++ b/examples/schema-import/src/main/java/org/apache/ignite/schema/CacheConfig.java @@ -17,8 +17,7 @@ package org.apache.ignite.schema; -import javax.cache.configuration.Factory; -import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory; import org.apache.ignite.configuration.CacheConfiguration; /** @@ -31,7 +30,7 @@ public class CacheConfig { * @param name Cache name. * @param storeFactory Cache store factory. */ - public static CacheConfiguration cache(String name, Factory> storeFactory) { + public static CacheConfiguration cache(String name, CacheJdbcPojoStoreFactory storeFactory) { throw new IllegalStateException("Please run Ignite Schema Import Utility as described in README.txt"); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d71f6129/examples/schema-import/src/main/java/org/apache/ignite/schema/Demo.java ---------------------------------------------------------------------- diff --git a/examples/schema-import/src/main/java/org/apache/ignite/schema/Demo.java b/examples/schema-import/src/main/java/org/apache/ignite/schema/Demo.java index cade7f1..a981f5a 100644 --- a/examples/schema-import/src/main/java/org/apache/ignite/schema/Demo.java +++ b/examples/schema-import/src/main/java/org/apache/ignite/schema/Demo.java @@ -18,13 +18,13 @@ package org.apache.ignite.schema; import javax.cache.Cache; -import javax.cache.configuration.Factory; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.Ignition; -import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStore; +import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory; +import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.transactions.Transaction; import org.h2.jdbcx.JdbcConnectionPool; @@ -38,16 +38,14 @@ import org.h2.jdbcx.JdbcConnectionPool; */ public class Demo { /** - * Constructs and returns a fully configured instance of a {@link CacheJdbcPojoStore}. + * Constructs and returns a fully configured instance of a {@link CacheJdbcPojoStoreFactory}. */ - private static class H2DemoStoreFactory implements Factory> { - /** {@inheritDoc} */ - @Override public CacheStore create() { - CacheJdbcPojoStore store = new CacheJdbcPojoStore<>(); + private static class H2DemoStoreFactory extends CacheJdbcPojoStoreFactory { + /** Default constructor. */ + H2DemoStoreFactory() { + setDialect(new H2Dialect()); - store.setDataSource(JdbcConnectionPool.create("jdbc:h2:tcp://localhost/~/schema-import/demo", "sa", "")); - - return store; + setDataSource(JdbcConnectionPool.create("jdbc:h2:tcp://localhost/~/schema-import/demo", "sa", "")); } } @@ -144,4 +142,4 @@ public class Demo { System.out.println(">>> Updated person: " + cache.get(key)); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d71f6129/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java b/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java index f12044d..af11999 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java @@ -50,20 +50,33 @@ public class QueryIndex implements Serializable { /** * Creates single-field sorted ascending index. * - * @param name Field name. + * @param field Field name. */ - public QueryIndex(String name) { - this(name, QueryIndexType.SORTED, true); + public QueryIndex(String field) { + this(field, QueryIndexType.SORTED, true); } /** * Creates single-field sorted index. * - * @param name Field name. + * @param field Field name. * @param asc Ascending flag. */ - public QueryIndex(String name, boolean asc) { - this(name, QueryIndexType.SORTED, asc); + public QueryIndex(String field, boolean asc) { + this(field, QueryIndexType.SORTED, asc); + } + + /** + * Creates single-field sorted index. + * + * @param field Field name. + * @param asc Ascending flag. + * @param name Index name. + */ + public QueryIndex(String field, boolean asc, String name) { + this(field, QueryIndexType.SORTED, asc); + + this.name = name; } /** @@ -71,14 +84,20 @@ public class QueryIndex implements Serializable { * If index is sorted, then ascending sorting is used by default. * To specify sort order, use the next method. * This constructor should also have a corresponding setter method. + * + * @param field Field name. + * @param type Index type. */ public QueryIndex(String field, QueryIndexType type) { this(Arrays.asList(field), type); } /** - * Creates index for one field. The last boolean parameter - * is ignored for non-sorted indexes. + * Creates index for one field. The last boolean parameter is ignored for non-sorted indexes. + * + * @param field Field name. + * @param type Index type. + * @param asc Ascending flag. */ public QueryIndex(String field, QueryIndexType type, boolean asc) { fields = new LinkedHashMap<>(); @@ -88,6 +107,22 @@ public class QueryIndex implements Serializable { } /** + * Creates index for one field. The last boolean parameter is ignored for non-sorted indexes. + * + * @param field Field name. + * @param type Index type. + * @param asc Ascending flag. + * @param name Index name. + */ + public QueryIndex(String field, QueryIndexType type, boolean asc, String name) { + fields = new LinkedHashMap<>(); + fields.put(field, asc); + + this.type = type; + this.name = name; + } + + /** * Creates index for a collection of fields. If index is sorted, fields will be sorted in * ascending order. * @@ -189,4 +224,4 @@ public class QueryIndex implements Serializable { public void setIndexType(QueryIndexType type) { this.type = type; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d71f6129/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java index 6e19234..6dc413b 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java @@ -30,6 +30,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; @@ -66,6 +68,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lifecycle.LifecycleAware; +import org.apache.ignite.marshaller.portable.BinaryMarshaller; import org.apache.ignite.resources.CacheStoreSessionResource; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.LoggerResource; @@ -75,6 +78,10 @@ import org.jetbrains.annotations.Nullable; import static java.sql.Statement.EXECUTE_FAILED; import static java.sql.Statement.SUCCESS_NO_INFO; +import static org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory.DFLT_BATCH_SIZE; +import static org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory.DFLT_WRITE_ATTEMPTS; +import static org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory.DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD; + /** * Implementation of {@link CacheStore} backed by JDBC. *

@@ -99,35 +106,43 @@ import static java.sql.Statement.SUCCESS_NO_INFO; *

Java Example

*
  *    ...
- *    CacheConfiguration ccfg = new CacheConfiguration<>();
- *
- *    // Configure cache store.
- *    ccfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(ConfigurationSnippet.store()));
+ *    // Create store factory.
+ *    CacheJdbcPojoStoreFactory storeFactory = new CacheJdbcPojoStoreFactory();
+ *    storeFactory.setDataSourceBean("your_data_source_name");
+ *    storeFactory.setDialect(new H2Dialect());
+ *    storeFactory.setTypes(array_with_your_types);
+ *    ...
+ *    ccfg.setCacheStoreFactory(storeFactory);
  *    ccfg.setReadThrough(true);
  *    ccfg.setWriteThrough(true);
  *
- *    // Configure cache types metadata.
- *    ccfg.setTypeMetadata(ConfigurationSnippet.typeMetadata());
- *
  *    cfg.setCacheConfiguration(ccfg);
  *    ...
  * 
*/ public abstract class CacheAbstractJdbcStore implements CacheStore, LifecycleAware { - /** Max attempt write count. */ - protected static final int MAX_ATTEMPT_WRITE_COUNT = 2; - - /** Default batch size for put and remove operations. */ - protected static final int DFLT_BATCH_SIZE = 512; - - /** Default batch size for put and remove operations. */ - protected static final int DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD = 512; - /** Connection attribute property name. */ protected static final String ATTR_CONN_PROP = "JDBC_STORE_CONNECTION"; - /** Empty column value. */ - protected static final Object[] EMPTY_COLUMN_VALUE = new Object[] { null }; + /** Built in Java types names. */ + protected static final Collection BUILT_IN_TYPES = new HashSet<>(); + + static { + BUILT_IN_TYPES.add("java.math.BigDecimal"); + BUILT_IN_TYPES.add("java.lang.Boolean"); + BUILT_IN_TYPES.add("java.lang.Byte"); + BUILT_IN_TYPES.add("java.lang.Character"); + BUILT_IN_TYPES.add("java.lang.Double"); + BUILT_IN_TYPES.add("java.util.Date"); + BUILT_IN_TYPES.add("java.sql.Date"); + BUILT_IN_TYPES.add("java.lang.Float"); + BUILT_IN_TYPES.add("java.lang.Integer"); + BUILT_IN_TYPES.add("java.lang.Long"); + BUILT_IN_TYPES.add("java.lang.Short"); + BUILT_IN_TYPES.add("java.lang.String"); + BUILT_IN_TYPES.add("java.sql.Timestamp"); + BUILT_IN_TYPES.add("java.util.UUID"); + } /** Auto-injected store session. */ @CacheStoreSessionResource @@ -135,7 +150,7 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, /** Auto injected ignite instance. */ @IgniteInstanceResource - private Ignite ignite; + protected Ignite ignite; /** Auto-injected logger instance. */ @LoggerResource @@ -151,30 +166,40 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, /** Cache with entry mapping description. (cache name, (key id, mapping description)). */ protected volatile Map> cacheMappings = Collections.emptyMap(); + /** Maximum batch size for writeAll and deleteAll operations. */ + private int batchSize = DFLT_BATCH_SIZE; + /** Database dialect. */ protected JdbcDialect dialect; - /** Max workers thread count. These threads are responsible for load cache. */ - private int maxPoolSz = Runtime.getRuntime().availableProcessors(); + /** Maximum write attempts in case of database error. */ + private int maxWrtAttempts = DFLT_WRITE_ATTEMPTS; - /** Maximum batch size for writeAll and deleteAll operations. */ - private int batchSz = DFLT_BATCH_SIZE; + /** Max workers thread count. These threads are responsible for load cache. */ + private int maxPoolSize = Runtime.getRuntime().availableProcessors(); /** Parallel load cache minimum threshold. If {@code 0} then load sequentially. */ private int parallelLoadCacheMinThreshold = DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD; + /** Types that store could process. */ + private JdbcType[] types; + + /** Hash calculator. */ + protected JdbcTypeHasher hasher = JdbcTypeDefaultHasher.INSTANCE; + /** * Get field value from object for use as query parameter. * * @param cacheName Cache name. * @param typeName Type name. + * @param typeKind Type kind. * @param fieldName Field name. * @param obj Cache object. * @return Field value from object. * @throws CacheException in case of error. */ - @Nullable protected abstract Object extractParameter(@Nullable String cacheName, String typeName, String fieldName, - Object obj) throws CacheException; + @Nullable protected abstract Object extractParameter(@Nullable String cacheName, String typeName, TypeKind typeKind, + String fieldName, Object obj) throws CacheException; /** * Construct object from query result. @@ -182,33 +207,36 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, * @param Type of result object. * @param cacheName Cache name. * @param typeName Type name. - * @param fields Fields descriptors. + * @param typeKind Type kind. + * @param flds Fields descriptors. + * @param hashFlds Field names for hash code calculation. * @param loadColIdxs Select query columns index. * @param rs ResultSet. * @return Constructed object. * @throws CacheLoaderException If failed to construct cache object. */ - protected abstract R buildObject(@Nullable String cacheName, String typeName, - Collection fields, Map loadColIdxs, ResultSet rs) + protected abstract R buildObject(@Nullable String cacheName, String typeName, TypeKind typeKind, + JdbcTypeField[] flds, Collection hashFlds, Map loadColIdxs, ResultSet rs) throws CacheLoaderException; /** - * Extract key type id from key object. + * Calculate type ID for object. * - * @param key Key object. - * @return Key type id. - * @throws CacheException If failed to get type key id from object. + * @param obj Object to calculate type ID for. + * @return Type ID. + * @throws CacheException If failed to calculate type ID for given object. */ - protected abstract Object keyTypeId(Object key) throws CacheException; + protected abstract Object typeIdForObject(Object obj) throws CacheException; /** - * Extract key type id from key class name. + * Calculate type ID for given type name. * - * @param type String description of key type. - * @return Key type id. - * @throws CacheException If failed to get type key id from object. + * @param kind If {@code true} then calculate type ID for POJO otherwise for binary object . + * @param typeName String description of type name. + * @return Type ID. + * @throws CacheException If failed to get type ID for given type name. */ - protected abstract Object keyTypeId(String type) throws CacheException; + protected abstract Object typeIdForTypeName(TypeKind kind, String typeName) throws CacheException; /** * Prepare internal store specific builders for provided types metadata. @@ -217,7 +245,7 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, * @param types Collection of types. * @throws CacheException If failed to prepare internal builders for types. */ - protected abstract void prepareBuilders(@Nullable String cacheName, Collection types) + protected abstract void prepareBuilders(@Nullable String cacheName, Collection types) throws CacheException; /** @@ -480,23 +508,23 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, ? em.loadCacheQry : em.loadCacheRangeQuery(lowerBound != null, upperBound != null)); - int ix = 1; + int idx = 1; if (lowerBound != null) for (int i = lowerBound.length; i > 0; i--) for (int j = 0; j < i; j++) - stmt.setObject(ix++, lowerBound[j]); + stmt.setObject(idx++, lowerBound[j]); if (upperBound != null) for (int i = upperBound.length; i > 0; i--) for (int j = 0; j < i; j++) - stmt.setObject(ix++, upperBound[j]); + stmt.setObject(idx++, upperBound[j]); ResultSet rs = stmt.executeQuery(); while (rs.next()) { - K key = buildObject(em.cacheName, em.keyType(), em.keyColumns(), em.loadColIdxs, rs); - V val = buildObject(em.cacheName, em.valueType(), em.valueColumns(), em.loadColIdxs, rs); + K key = buildObject(em.cacheName, em.keyType(), em.keyKind(), em.keyColumns(), em.keyCols, em.loadColIdxs, rs); + V val = buildObject(em.cacheName, em.valueType(), em.valueKind(), em.valueColumns(), null, em.loadColIdxs, rs); clo.apply(key, val); } @@ -527,58 +555,86 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, } /** - * Object is a simple type. + * Checks if type configured properly. * - * @param cls Class. - * @return {@code True} if object is a simple type. - */ - protected static boolean simpleType(Class cls) { - return (Number.class.isAssignableFrom(cls) || String.class.isAssignableFrom(cls) || - java.util.Date.class.isAssignableFrom(cls) || Boolean.class.isAssignableFrom(cls) || - UUID.class.isAssignableFrom(cls)); - } - - /** * @param cacheName Cache name to check mapping for. - * @param clsName Class name. - * @param fields Fields descriptors. - * @throws CacheException If failed to check type metadata. + * @param typeName Type name. + * @param flds Fields descriptors. + * @throws CacheException If failed to check type configuration. */ - private static void checkMapping(@Nullable String cacheName, String clsName, - Collection fields) throws CacheException { + private void checkTypeConfiguration(@Nullable String cacheName, TypeKind kind, String typeName, + JdbcTypeField[] flds) throws CacheException { try { - Class cls = Class.forName(clsName); - - if (simpleType(cls)) { - if (fields.size() != 1) - throw new CacheException("More than one field for simple type [cache name=" + cacheName - + ", type=" + clsName + " ]"); + if (kind == TypeKind.BUILT_IN) { + if (flds.length != 1) + throw new CacheException("More than one field for built in type [cache=" + U.maskName(cacheName) + + ", type=" + typeName + " ]"); - CacheTypeFieldMetadata field = F.first(fields); + JdbcTypeField field = flds[0]; - if (field.getDatabaseName() == null) - throw new CacheException("Missing database name in mapping description [cache name=" + cacheName - + ", type=" + clsName + " ]"); + if (field.getDatabaseFieldName() == null) + throw new CacheException("Missing database name in mapping description [cache=" + + U.maskName(cacheName) + ", type=" + typeName + " ]"); - field.setJavaType(cls); + field.setJavaFieldType(Class.forName(typeName)); } else - for (CacheTypeFieldMetadata field : fields) { - if (field.getDatabaseName() == null) - throw new CacheException("Missing database name in mapping description [cache name=" + cacheName - + ", type=" + clsName + " ]"); - - if (field.getJavaName() == null) - throw new CacheException("Missing field name in mapping description [cache name=" + cacheName - + ", type=" + clsName + " ]"); - - if (field.getJavaType() == null) - throw new CacheException("Missing field type in mapping description [cache name=" + cacheName - + ", type=" + clsName + " ]"); + for (JdbcTypeField field : flds) { + if (field.getDatabaseFieldName() == null) + throw new CacheException("Missing database name in mapping description [cache=" + + U.maskName(cacheName) + ", type=" + typeName + " ]"); + + if (field.getJavaFieldName() == null) + throw new CacheException("Missing field name in mapping description [cache=" + + U.maskName(cacheName) + ", type=" + typeName + " ]"); + + if (field.getJavaFieldType() == null) + throw new CacheException("Missing field type in mapping description [cache=" + + U.maskName(cacheName) + ", type=" + typeName + " ]"); } } catch (ClassNotFoundException e) { - throw new CacheException("Failed to find class: " + clsName, e); + throw new CacheException("Failed to find class: " + typeName, e); + } + } + + /** + * For backward compatibility translate old field type descriptors to new format. + * + * @param oldFlds Fields in old format. + * @return Fields in new format. + */ + @Deprecated + private JdbcTypeField[] translateFields(Collection oldFlds) { + JdbcTypeField[] newFlds = new JdbcTypeField[oldFlds.size()]; + + int idx = 0; + + for (CacheTypeFieldMetadata oldField : oldFlds) { + newFlds[idx] = new JdbcTypeField(oldField.getDatabaseType(), oldField.getDatabaseName(), + oldField.getJavaType(), oldField.getJavaName()); + + idx++; + } + + return newFlds; + } + + /** + * @param type Type name to check. + * @return {@code True} if class not found. + */ + protected TypeKind kindForName(String type) { + if (BUILT_IN_TYPES.contains(type)) + return TypeKind.BUILT_IN; + + try { + Class.forName(type); + + return TypeKind.POJO; + } + catch(ClassNotFoundException ignored) { + return TypeKind.BINARY; } } @@ -587,46 +643,104 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, * @return Type mappings for specified cache name. * @throws CacheException If failed to initialize cache mappings. */ - private Map cacheMappings(@Nullable String cacheName) throws CacheException { + private Map getOrCreateCacheMappings(@Nullable String cacheName) throws CacheException { Map entryMappings = cacheMappings.get(cacheName); if (entryMappings != null) return entryMappings; cacheMappingsLock.lock(); - try { entryMappings = cacheMappings.get(cacheName); if (entryMappings != null) return entryMappings; - CacheConfiguration ccfg = ignite().cache(cacheName).getConfiguration(CacheConfiguration.class); + // If no types configured, check CacheTypeMetadata for backward compatibility. + if (types == null) { + CacheConfiguration ccfg = ignite.cache(cacheName).getConfiguration(CacheConfiguration.class); + + Collection oldTypes = ccfg.getTypeMetadata(); + + types = new JdbcType[oldTypes.size()]; + + int idx = 0; + + for (CacheTypeMetadata oldType : oldTypes) { + JdbcType newType = new JdbcType(); - Collection types = ccfg.getTypeMetadata(); + newType.setCacheName(cacheName); - entryMappings = U.newHashMap(types.size()); + newType.setDatabaseSchema(oldType.getDatabaseSchema()); + newType.setDatabaseTable(oldType.getDatabaseTable()); - for (CacheTypeMetadata type : types) { - Object keyTypeId = keyTypeId(type.getKeyType()); + newType.setKeyType(oldType.getKeyType()); + newType.setKeyFields(translateFields(oldType.getKeyFields())); - if (entryMappings.containsKey(keyTypeId)) - throw new CacheException("Key type must be unique in type metadata [cache name=" + cacheName + - ", key type=" + type.getKeyType() + "]"); + newType.setValueType(oldType.getValueType()); + newType.setValueFields(translateFields(oldType.getValueFields())); - checkMapping(cacheName, type.getKeyType(), type.getKeyFields()); - checkMapping(cacheName, type.getValueType(), type.getValueFields()); + types[idx] = newType; - entryMappings.put(keyTypeId(type.getKeyType()), new EntryMapping(cacheName, dialect, type)); + idx++; + } } - Map> mappings = new HashMap<>(cacheMappings); + List cacheTypes = new ArrayList<>(types.length); + + for (JdbcType type : types) + if ((cacheName != null && cacheName.equals(type.getCacheName())) || + (cacheName == null && type.getCacheName() == null)) + cacheTypes.add(type); + + entryMappings = U.newHashMap(cacheTypes.size()); + + if (!cacheTypes.isEmpty()) { + boolean binarySupported = ignite.configuration().getMarshaller() instanceof BinaryMarshaller; + + for (JdbcType type : cacheTypes) { + String keyType = type.getKeyType(); + String valType = type.getValueType(); + + TypeKind keyKind = kindForName(keyType); + + if (!binarySupported && keyKind == TypeKind.BINARY) + throw new CacheException("Key type has no class [cache=" + U.maskName(cacheName) + + ", type=" + keyType + "]"); + + checkTypeConfiguration(cacheName, keyKind, keyType, type.getKeyFields()); + + Object keyTypeId = typeIdForTypeName(keyKind, keyType); - mappings.put(cacheName, entryMappings); + if (entryMappings.containsKey(keyTypeId)) + throw new CacheException("Key type must be unique in type metadata [cache=" + + U.maskName(cacheName) + ", type=" + keyType + "]"); - prepareBuilders(cacheName, types); + TypeKind valKind = kindForName(valType); - cacheMappings = mappings; + checkTypeConfiguration(cacheName, valKind, valType, type.getValueFields()); + + entryMappings.put(keyTypeId, new EntryMapping(cacheName, dialect, type, keyKind, valKind)); + + // Add one more binding to binary typeId for POJOs, + // because object could be passed to store in binary format. + if (binarySupported && keyKind == TypeKind.POJO) { + keyTypeId = typeIdForTypeName(TypeKind.BINARY, keyType); + + valKind = valKind == TypeKind.POJO ? TypeKind.BINARY : valKind; + + entryMappings.put(keyTypeId, new EntryMapping(cacheName, dialect, type, TypeKind.BINARY, valKind)); + } + } + + Map> mappings = new HashMap<>(cacheMappings); + + mappings.put(cacheName, entryMappings); + + prepareBuilders(cacheName, cacheTypes); + + cacheMappings = mappings; + } return entryMappings; } @@ -637,19 +751,21 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, /** * @param cacheName Cache name. - * @param keyTypeId Key type id. - * @param key Key object. + * @param typeId Type id. * @return Entry mapping. * @throws CacheException If mapping for key was not found. */ - private EntryMapping entryMapping(String cacheName, Object keyTypeId, Object key) throws CacheException { - EntryMapping em = cacheMappings(cacheName).get(keyTypeId); + private EntryMapping entryMapping(String cacheName, Object typeId) throws CacheException { + Map mappings = getOrCreateCacheMappings(cacheName); + + EntryMapping em = mappings.get(typeId); if (em == null) { String maskedCacheName = U.maskName(cacheName); - throw new CacheException("Failed to find mapping description [key=" + key + - ", cache=" + maskedCacheName + "]. Please configure CacheTypeMetadata to associate '" + maskedCacheName + "' with JdbcPojoStore."); + throw new CacheException("Failed to find mapping description [cache=" + maskedCacheName + + ", typeId=" + typeId + "]. Please configure JdbcType to associate cache '" + maskedCacheName + + "' with JdbcPojoStore."); } return em; @@ -663,34 +779,37 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, String cacheName = session().cacheName(); try { - pool = Executors.newFixedThreadPool(maxPoolSz); + pool = Executors.newFixedThreadPool(maxPoolSize); Collection> futs = new ArrayList<>(); + Map mappings = getOrCreateCacheMappings(cacheName); + if (args != null && args.length > 0) { if (args.length % 2 != 0) throw new CacheLoaderException("Expected even number of arguments, but found: " + args.length); if (log.isDebugEnabled()) - log.debug("Start loading entries from db using user queries from arguments"); + log.debug("Start loading entries from db using user queries from arguments..."); for (int i = 0; i < args.length; i += 2) { String keyType = args[i].toString(); String selQry = args[i + 1].toString(); - EntryMapping em = entryMapping(cacheName, keyTypeId(keyType), keyType); + EntryMapping em = entryMapping(cacheName, typeIdForTypeName(kindForName(keyType), keyType)); futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, selQry, clo))); } } else { - Collection entryMappings = cacheMappings(session().cacheName()).values(); + Collection entryMappings = mappings.values(); for (EntryMapping em : entryMappings) { if (parallelLoadCacheMinThreshold > 0) { - log.debug("Multithread loading entries from db [cache name=" + cacheName + - ", key type=" + em.keyType() + " ]"); + if (log.isDebugEnabled()) + log.debug("Multithread loading entries from db [cache=" + U.maskName(cacheName) + + ", keyType=" + em.keyType() + " ]"); Connection conn = null; @@ -738,8 +857,8 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, } else { if (log.isDebugEnabled()) - log.debug("Single thread loading entries from db [cache name=" + cacheName + - ", key type=" + em.keyType() + " ]"); + log.debug("Single thread loading entries from db [cache=" + U.maskName(cacheName) + + ", keyType=" + em.keyType() + " ]"); futs.add(pool.submit(loadCacheFull(em, clo))); } @@ -750,10 +869,10 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, U.get(fut); if (log.isDebugEnabled()) - log.debug("Cache loaded from db: " + cacheName); + log.debug("Cache loaded from db: " + U.maskName(cacheName)); } catch (IgniteCheckedException e) { - throw new CacheLoaderException("Failed to load cache: " + cacheName, e.getCause()); + throw new CacheLoaderException("Failed to load cache: " + U.maskName(cacheName), e.getCause()); } finally { U.shutdownNow(getClass(), pool, log); @@ -764,7 +883,7 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, @Nullable @Override public V load(K key) throws CacheLoaderException { assert key != null; - EntryMapping em = entryMapping(session().cacheName(), keyTypeId(key), key); + EntryMapping em = entryMapping(session().cacheName(), typeIdForObject(key)); if (log.isDebugEnabled()) log.debug("Load value from db [table= " + em.fullTableName() + ", key=" + key + "]"); @@ -783,7 +902,7 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, ResultSet rs = stmt.executeQuery(); if (rs.next()) - return buildObject(em.cacheName, em.valueType(), em.valueColumns(), em.loadColIdxs, rs); + return buildObject(em.cacheName, em.valueType(), em.valueKind(), em.valueColumns(), null, em.loadColIdxs, rs); } catch (SQLException e) { throw new CacheLoaderException("Failed to load object [table=" + em.fullTableName() + @@ -807,14 +926,14 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, String cacheName = session().cacheName(); - Map> workers = U.newHashMap(cacheMappings(cacheName).size()); + Map> workers = U.newHashMap(getOrCreateCacheMappings(cacheName).size()); Map res = new HashMap<>(); for (K key : keys) { - Object keyTypeId = keyTypeId(key); + Object keyTypeId = typeIdForObject(key); - EntryMapping em = entryMapping(cacheName, keyTypeId, key); + EntryMapping em = entryMapping(cacheName, keyTypeId); LoadWorker worker = workers.get(keyTypeId); @@ -852,7 +971,7 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, try { CacheWriterException we = null; - for (int attempt = 0; attempt < MAX_ATTEMPT_WRITE_COUNT; attempt++) { + for (int attempt = 0; attempt < maxWrtAttempts; attempt++) { int paramIdx = fillValueParameters(updStmt, 1, em, entry.getValue()); fillKeyParameters(updStmt, paramIdx, em, entry.getKey()); @@ -921,7 +1040,7 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, K key = entry.getKey(); - EntryMapping em = entryMapping(session().cacheName(), keyTypeId(key), key); + EntryMapping em = entryMapping(session().cacheName(), typeIdForObject(key)); if (log.isDebugEnabled()) log.debug("Start write entry to database [table=" + em.fullTableName() + ", entry=" + entry + "]"); @@ -937,9 +1056,9 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, try { stmt = conn.prepareStatement(em.mergeQry); - int i = fillKeyParameters(stmt, em, key); + int idx = fillKeyParameters(stmt, em, key); - fillValueParameters(stmt, i, em, entry.getValue()); + fillValueParameters(stmt, idx, em, entry.getValue()); int updCnt = stmt.executeUpdate(); @@ -1010,15 +1129,15 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, for (Cache.Entry entry : entries) { K key = entry.getKey(); - Object keyTypeId = keyTypeId(key); + Object keyTypeId = typeIdForObject(key); - em = entryMapping(cacheName, keyTypeId, key); + em = entryMapping(cacheName, keyTypeId); if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) { if (mergeStmt != null) { if (log.isDebugEnabled()) - log.debug("Write entries to db [cache name=" + cacheName + - ", key type=" + em.keyType() + ", count=" + prepared + "]"); + log.debug("Write entries to db [cache=" + U.maskName(cacheName) + + ", keyType=" + em.keyType() + ", cnt=" + prepared + "]"); executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries); @@ -1034,16 +1153,16 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, prepared = 0; } - int i = fillKeyParameters(mergeStmt, em, key); + int idx = fillKeyParameters(mergeStmt, em, key); - fillValueParameters(mergeStmt, i, em, entry.getValue()); + fillValueParameters(mergeStmt, idx, em, entry.getValue()); mergeStmt.addBatch(); - if (++prepared % batchSz == 0) { + if (++prepared % batchSize == 0) { if (log.isDebugEnabled()) - log.debug("Write entries to db [cache name=" + cacheName + - ", key type=" + em.keyType() + ", count=" + prepared + "]"); + log.debug("Write entries to db [cache=" + U.maskName(cacheName) + + ", keyType=" + em.keyType() + ", cnt=" + prepared + "]"); executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries); @@ -1053,10 +1172,10 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, } } - if (mergeStmt != null && prepared % batchSz != 0) { + if (mergeStmt != null && prepared % batchSize != 0) { if (log.isDebugEnabled()) - log.debug("Write entries to db [cache name=" + cacheName + - ", key type=" + em.keyType() + ", count=" + prepared + "]"); + log.debug("Write entries to db [cache=" + U.maskName(cacheName) + + ", keyType=" + em.keyType() + ", cnt=" + prepared + "]"); executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries); @@ -1067,8 +1186,9 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, } } else { - log.debug("Write entries to db one by one using update and insert statements [cache name=" + - cacheName + ", count=" + entries.size() + "]"); + if (log.isDebugEnabled()) + log.debug("Write entries to db one by one using update and insert statements [cache=" + + U.maskName(cacheName) + ", cnt=" + entries.size() + "]"); PreparedStatement insStmt = null; @@ -1078,9 +1198,9 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, for (Cache.Entry entry : entries) { K key = entry.getKey(); - Object keyTypeId = keyTypeId(key); + Object keyTypeId = typeIdForObject(key); - EntryMapping em = entryMapping(cacheName, keyTypeId, key); + EntryMapping em = entryMapping(cacheName, keyTypeId); if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) { U.closeQuiet(insStmt); @@ -1116,7 +1236,7 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, @Override public void delete(Object key) throws CacheWriterException { assert key != null; - EntryMapping em = entryMapping(session().cacheName(), keyTypeId(key), key); + EntryMapping em = entryMapping(session().cacheName(), typeIdForObject(key)); if (log.isDebugEnabled()) log.debug("Remove value from db [table=" + em.fullTableName() + ", key=" + key + "]"); @@ -1220,9 +1340,9 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, int fromIdx = 0, prepared = 0; for (Object key : keys) { - Object keyTypeId = keyTypeId(key); + Object keyTypeId = typeIdForObject(key); - em = entryMapping(cacheName, keyTypeId, key); + em = entryMapping(cacheName, keyTypeId); if (delStmt == null) { delStmt = conn.prepareStatement(em.remQry); @@ -1232,8 +1352,8 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, if (!currKeyTypeId.equals(keyTypeId)) { if (log.isDebugEnabled()) - log.debug("Delete entries from db [cache name=" + cacheName + - ", key type=" + em.keyType() + ", count=" + prepared + "]"); + log.debug("Delete entries from db [cache=" + U.maskName(cacheName) + + ", keyType=" + em.keyType() + ", cnt=" + prepared + "]"); executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys); @@ -1248,10 +1368,10 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, delStmt.addBatch(); - if (++prepared % batchSz == 0) { + if (++prepared % batchSize == 0) { if (log.isDebugEnabled()) - log.debug("Delete entries from db [cache name=" + cacheName + - ", key type=" + em.keyType() + ", count=" + prepared + "]"); + log.debug("Delete entries from db [cache=" + U.maskName(cacheName) + + ", keyType=" + em.keyType() + ", cnt=" + prepared + "]"); executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys); @@ -1261,10 +1381,10 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, } } - if (delStmt != null && prepared % batchSz != 0) { + if (delStmt != null && prepared % batchSize != 0) { if (log.isDebugEnabled()) - log.debug("Delete entries from db [cache name=" + cacheName + - ", key type=" + em.keyType() + ", count=" + prepared + "]"); + log.debug("Delete entries from db [cache=" + U.maskName(cacheName) + + ", keyType=" + em.keyType() + ", cnt=" + prepared + "]"); executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys); } @@ -1281,17 +1401,17 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, * Sets the value of the designated parameter using the given object. * * @param stmt Prepare statement. - * @param i Index for parameters. + * @param idx Index for parameters. * @param field Field descriptor. * @param fieldVal Field value. * @throws CacheException If failed to set statement parameter. */ - protected void fillParameter(PreparedStatement stmt, int i, CacheTypeFieldMetadata field, @Nullable Object fieldVal) + protected void fillParameter(PreparedStatement stmt, int idx, JdbcTypeField field, @Nullable Object fieldVal) throws CacheException { try { if (fieldVal != null) { - if (field.getJavaType() == UUID.class) { - switch (field.getDatabaseType()) { + if (field.getJavaFieldType() == UUID.class) { + switch (field.getDatabaseFieldType()) { case Types.BINARY: fieldVal = U.uuidToBytes((UUID)fieldVal); @@ -1304,13 +1424,13 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, } } - stmt.setObject(i, fieldVal); + stmt.setObject(idx, fieldVal); } else - stmt.setNull(i, field.getDatabaseType()); + stmt.setNull(idx, field.getDatabaseFieldType()); } catch (SQLException e) { - throw new CacheException("Failed to set statement parameter name: " + field.getDatabaseName(), e); + throw new CacheException("Failed to set statement parameter name: " + field.getDatabaseFieldName(), e); } } @@ -1324,8 +1444,8 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, */ protected int fillKeyParameters(PreparedStatement stmt, int idx, EntryMapping em, Object key) throws CacheException { - for (CacheTypeFieldMetadata field : em.keyColumns()) { - Object fieldVal = extractParameter(em.cacheName, em.keyType(), field.getJavaName(), key); + for (JdbcTypeField field : em.keyColumns()) { + Object fieldVal = extractParameter(em.cacheName, em.keyType(), em.keyKind(), field.getJavaFieldName(), key); fillParameter(stmt, idx++, field, fieldVal); } @@ -1354,8 +1474,8 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, */ protected int fillValueParameters(PreparedStatement stmt, int idx, EntryMapping em, Object val) throws CacheWriterException { - for (CacheTypeFieldMetadata field : em.uniqValFields) { - Object fieldVal = extractParameter(em.cacheName, em.valueType(), field.getJavaName(), val); + for (JdbcTypeField field : em.uniqValFlds) { + Object fieldVal = extractParameter(em.cacheName, em.valueType(), em.valueKind(), field.getJavaFieldName(), val); fillParameter(stmt, idx++, field, fieldVal); } @@ -1401,16 +1521,70 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, * @return Max workers thread count. */ public int getMaximumPoolSize() { - return maxPoolSz; + return maxPoolSize; } /** * Set Max workers thread count. These threads are responsible for execute query. * - * @param maxPoolSz Max workers thread count. + * @param maxPoolSize Max workers thread count. */ - public void setMaximumPoolSize(int maxPoolSz) { - this.maxPoolSz = maxPoolSz; + public void setMaximumPoolSize(int maxPoolSize) { + this.maxPoolSize = maxPoolSize; + } + + /** + * Gets maximum number of write attempts in case of database error. + * + * @return Maximum number of write attempts. + */ + public int getMaximumWriteAttempts() { + return maxWrtAttempts; + } + + /** + * Sets maximum number of write attempts in case of database error. + * + * @param maxWrtAttempts Number of write attempts. + */ + public void setMaximumWriteAttempts(int maxWrtAttempts) { + this.maxWrtAttempts = maxWrtAttempts; + } + + /** + * Gets types known by store. + * + * @return Types known by store. + */ + public JdbcType[] getTypes() { + return types; + } + + /** + * Sets store configurations. + * + * @param types Store should process. + */ + public void setTypes(JdbcType... types) { + this.types = types; + } + + /** + * Gets hash code calculator. + * + * @return Hash code calculator. + */ + public JdbcTypeHasher getHasher() { + return hasher; + } + + /** + * Sets hash code calculator. + * + * @param hasher Hash code calculator. + */ + public void setHasher(JdbcTypeHasher hasher) { + this.hasher = hasher; } /** @@ -1419,16 +1593,16 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, * @return Maximum batch size. */ public int getBatchSize() { - return batchSz; + return batchSize; } /** * Set maximum batch size for write and delete operations. * - * @param batchSz Maximum batch size. + * @param batchSize Maximum batch size. */ - public void setBatchSize(int batchSz) { - this.batchSz = batchSz; + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; } /** @@ -1464,6 +1638,18 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, } /** + * Type kind. + */ + protected enum TypeKind { + /** Type is known as Java built in type, like {@link String} */ + BUILT_IN, + /** Class for this type is available. */ + POJO, + /** Class for this type is not available. */ + BINARY + } + + /** * Entry mapping description. */ protected static class EntryMapping { @@ -1510,10 +1696,16 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, private final Map loadColIdxs; /** Unique value fields. */ - private final Collection uniqValFields; + private final Collection uniqValFlds; /** Type metadata. */ - private final CacheTypeMetadata typeMeta; + private final JdbcType typeMeta; + + /** Key type kind. */ + private final TypeKind keyKind; + + /** Value type kind. */ + private final TypeKind valKind; /** Full table name. */ private final String fullTblName; @@ -1523,22 +1715,27 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, * @param dialect JDBC dialect. * @param typeMeta Type metadata. */ - public EntryMapping(@Nullable String cacheName, JdbcDialect dialect, CacheTypeMetadata typeMeta) { + public EntryMapping(@Nullable String cacheName, JdbcDialect dialect, JdbcType typeMeta, + TypeKind keyKind, TypeKind valKind) { this.cacheName = cacheName; this.dialect = dialect; this.typeMeta = typeMeta; - Collection keyFields = typeMeta.getKeyFields(); + this.keyKind = keyKind; + + this.valKind = valKind; + + JdbcTypeField[] keyFields = typeMeta.getKeyFields(); - Collection valFields = typeMeta.getValueFields(); + JdbcTypeField[] valFields = typeMeta.getValueFields(); - keyCols = databaseColumns(keyFields); + keyCols = databaseColumns(F.asList(keyFields)); - uniqValFields = F.view(valFields, new IgnitePredicate() { - @Override public boolean apply(CacheTypeFieldMetadata col) { - return !keyCols.contains(col.getDatabaseName()); + uniqValFlds = F.view(F.asList(valFields), new IgnitePredicate() { + @Override public boolean apply(JdbcTypeField col) { + return !keyCols.contains(col.getDatabaseFieldName()); } }); @@ -1548,7 +1745,7 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, fullTblName = F.isEmpty(schema) ? tblName : schema + "." + tblName; - Collection uniqValCols = databaseColumns(uniqValFields); + Collection uniqValCols = databaseColumns(uniqValFlds); cols = F.concat(false, keyCols, uniqValCols); @@ -1579,21 +1776,49 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, } /** - * Extract database column names from {@link CacheTypeFieldMetadata}. + * Extract database column names from {@link JdbcTypeField}. * - * @param dsc collection of {@link CacheTypeFieldMetadata}. + * @param dsc collection of {@link JdbcTypeField}. * @return Collection with database column names. */ - private static Collection databaseColumns(Collection dsc) { - return F.transform(dsc, new C1() { + private static Collection databaseColumns(Collection dsc) { + return F.transform(dsc, new C1() { /** {@inheritDoc} */ - @Override public String apply(CacheTypeFieldMetadata col) { - return col.getDatabaseName(); + @Override public String apply(JdbcTypeField col) { + return col.getDatabaseFieldName(); } }); } /** + * @return Key type. + */ + protected String keyType() { + return typeMeta.getKeyType(); + } + + /** + * @return Key type kind. + */ + protected TypeKind keyKind() { + return keyKind; + } + + /** + * @return Value type. + */ + protected String valueType() { + return typeMeta.getValueType(); + } + + /** + * @return Value type kind. + */ + protected TypeKind valueKind() { + return valKind; + } + + /** * Construct query for select values with key count less or equal {@code maxKeysPerStmt} * * @param keyCnt Key count. @@ -1623,25 +1848,11 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, } /** - * @return Key type. - */ - protected String keyType() { - return typeMeta.getKeyType(); - } - - /** - * @return Value type. - */ - protected String valueType() { - return typeMeta.getValueType(); - } - - /** * Gets key columns. * * @return Key columns. */ - protected Collection keyColumns() { + protected JdbcTypeField[] keyColumns() { return typeMeta.getKeyFields(); } @@ -1650,7 +1861,7 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, * * @return Value columns. */ - protected Collection valueColumns() { + protected JdbcTypeField[] valueColumns() { return typeMeta.getValueFields(); } @@ -1694,8 +1905,8 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, /** {@inheritDoc} */ @Override public Void call() throws Exception { if (log.isDebugEnabled()) - log.debug("Load cache using custom query [cache name= " + em.cacheName + - ", key type=" + em.keyType() + ", query=" + qry + "]"); + log.debug("Load cache using custom query [cache= " + U.maskName(em.cacheName) + + ", keyType=" + em.keyType() + ", query=" + qry + "]"); Connection conn = null; @@ -1716,8 +1927,8 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, colIdxs.put(meta.getColumnLabel(i), i); while (rs.next()) { - K1 key = buildObject(em.cacheName, em.keyType(), em.keyColumns(), colIdxs, rs); - V1 val = buildObject(em.cacheName, em.valueType(), em.valueColumns(), colIdxs, rs); + K1 key = buildObject(em.cacheName, em.keyType(), em.keyKind(), em.keyColumns(), em.keyCols, colIdxs, rs); + V1 val = buildObject(em.cacheName, em.valueType(), em.valueKind(), em.valueColumns(), null, colIdxs, rs); clo.apply(key, val); } @@ -1790,8 +2001,7 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, /** {@inheritDoc} */ @Override public Map call() throws Exception { if (log.isDebugEnabled()) - log.debug("Load values from db [table= " + em.fullTableName() + - ", key count=" + keys.size() + "]"); + log.debug("Load values from db [table= " + em.fullTableName() + ", keysCnt=" + keys.size() + "]"); PreparedStatement stmt = null; @@ -1801,8 +2011,8 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, int idx = 1; for (Object key : keys) - for (CacheTypeFieldMetadata field : em.keyColumns()) { - Object fieldVal = extractParameter(em.cacheName, em.keyType(), field.getJavaName(), key); + for (JdbcTypeField field : em.keyColumns()) { + Object fieldVal = extractParameter(em.cacheName, em.keyType(), em.keyKind(), field.getJavaFieldName(), key); fillParameter(stmt, idx++, field, fieldVal); } @@ -1812,8 +2022,8 @@ public abstract class CacheAbstractJdbcStore implements CacheStore, Map entries = U.newHashMap(keys.size()); while (rs.next()) { - K1 key = buildObject(em.cacheName, em.keyType(), em.keyColumns(), em.loadColIdxs, rs); - V1 val = buildObject(em.cacheName, em.valueType(), em.valueColumns(), em.loadColIdxs, rs); + K1 key = buildObject(em.cacheName, em.keyType(), em.keyKind(), em.keyColumns(), em.keyCols, em.loadColIdxs, rs); + V1 val = buildObject(em.cacheName, em.valueType(), em.valueKind(), em.valueColumns(), null, em.loadColIdxs, rs); entries.put(key, val); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d71f6129/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcBlobStoreFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcBlobStoreFactory.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcBlobStoreFactory.java index 74ab30b..6a46619 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcBlobStoreFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcBlobStoreFactory.java @@ -35,7 +35,7 @@ import org.apache.ignite.resources.SpringApplicationContextResource; * *

Spring Example

*
- *     <bean id= "simpleDataSource" class="org.h2.jdbcx.JdbcDataSource"/>
+ *     <bean id= "myDataSource" class="org.h2.jdbcx.JdbcDataSource"/>
  *
  *     <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
  *          ...
@@ -46,7 +46,7 @@ import org.apache.ignite.resources.SpringApplicationContextResource;
  *                      <property name="cacheStoreFactory">
  *                          <bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcBlobStoreFactory">
  *                              <property name="user" value = "Ignite" />
- *                              <property name="dataSourceBean" value = "simpleDataSource" />
+ *                              <property name="dataSourceBean" value = "myDataSource" />
  *                          </bean>
  *                      </property>
  *                  </bean>
@@ -99,7 +99,7 @@ public class CacheJdbcBlobStoreFactory implements Factory create() {
@@ -118,7 +118,7 @@ public class CacheJdbcBlobStoreFactory implements Factory implements Factory