ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject incubator-ignite git commit: # IGNITE-32 Store: reworked store initialization with type metadata information.
Date Wed, 28 Jan 2015 17:38:40 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-32 61ac2c4e2 -> 977f61fa5


# IGNITE-32 Store: reworked store initialization with type metadata information.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/977f61fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/977f61fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/977f61fa

Branch: refs/heads/ignite-32
Commit: 977f61fa577b9d99ca567faa897c4a9a77f7aa90
Parents: 61ac2c4
Author: AKuznetsov <akuznetsov@gridgain.com>
Authored: Thu Jan 29 00:38:31 2015 +0700
Committer: AKuznetsov <akuznetsov@gridgain.com>
Committed: Thu Jan 29 00:38:31 2015 +0700

----------------------------------------------------------------------
 .../ignite/cache/store/jdbc/JdbcCacheStore.java | 103 +++++++++++--------
 .../cache/store/jdbc/JdbcPojoCacheStore.java    |  10 +-
 .../PojoCacheStoreMultitreadedSelfTest.java     |   7 +-
 .../store/jdbc/PojoCacheStoreSelfTest.java      |  18 +++-
 .../apache/ignite/schema/ui/SchemaLoadApp.java  |   4 +-
 5 files changed, 84 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/977f61fa/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
index 6e34443..1f1084d 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
@@ -36,6 +36,7 @@ import java.sql.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
 
 /**
  * Base {@link CacheStore} implementation backed by JDBC. This implementation stores objects
in underlying database
@@ -98,6 +99,10 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K,
V> {
     @GridToStringExclude
     private final CountDownLatch initLatch = new CountDownLatch(1);
 
+    /** Init lock. */
+    @GridToStringExclude
+    private final Lock initLock = new ReentrantLock();
+
     /** Successful initialization flag. */
     private boolean initOk;
 
@@ -114,11 +119,8 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K,
V> {
     @GridToStringExclude
     protected String passwd;
 
-    /** Type mapping description. */
-    protected Collection<CacheQueryTypeMetadata> typeMetadata;
-
-    /** Cache with query by type. */
-    protected Map<IgniteBiTuple<String, Object>, EntryMapping> entryMappings;
+    /** Cache with entry mapping description. (cache name, (key id, mapping description)).
*/
+    protected Map<Integer, Map<Object, EntryMapping>> cacheMappings = new ConcurrentHashMap<>();
 
     /** Database dialect. */
     protected JdbcDialect dialect;
@@ -176,7 +178,7 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K,
V> {
      *
      * @throws CacheException If failed to initialize.
      */
-    protected abstract void buildTypeCache() throws CacheException;
+    protected abstract void buildTypeCache(Collection<CacheQueryTypeMetadata> typeMetadata)
throws CacheException;
 
     /**
      * Perform dialect resolution.
@@ -222,11 +224,21 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K,
V> {
     }
 
     /**
+     *
+     * @return Cache key id.
+     */
+    protected Integer cacheKeyId() {
+        String cacheName = session().cacheName();
+
+        return cacheName != null ? cacheName.hashCode() : 0;
+    }
+
+    /**
      * Initializes store.
      *
      * @throws CacheException If failed to initialize.
      */
-    protected void init() throws CacheException {
+    private void init() throws CacheException {
         if (initLatch.getCount() > 0) {
             if (initGuard.compareAndSet(false, true)) {
                 if (log.isDebugEnabled())
@@ -235,14 +247,9 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K,
V> {
                 if (dataSrc == null && F.isEmpty(connUrl))
                     throw new CacheException("Failed to initialize cache store (connection
is not provided).");
 
-                if (dialect == null)
-                    dialect = resolveDialect();
-
                 try {
-                    if (typeMetadata == null)
-                        throw new CacheException("Failed to initialize cache store (mapping
description is not provided).");
-
-                    buildTypeCache();
+                    if (dialect == null)
+                        dialect = resolveDialect();
 
                     initOk = true;
                 }
@@ -264,6 +271,31 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K,
V> {
 
         if (!initOk)
             throw new CacheException("Cache store was not properly initialized.");
+
+        Integer cacheKey = cacheKeyId();
+
+        if (!cacheMappings.containsKey(cacheKey)) {
+            initLock.lock();
+
+            try {
+                if (!cacheMappings.containsKey(cacheKey)) {
+                    Collection<CacheQueryTypeMetadata> typeMetadata =
+                        ignite().cache(session().cacheName()).configuration().getQueryConfiguration().getTypeMetadata();
+
+                    Map<Object, EntryMapping> entryMappings = U.newHashMap(typeMetadata.size());
+
+                    for (CacheQueryTypeMetadata type : typeMetadata)
+                        entryMappings.put(keyId(type.getKeyType()), new EntryMapping(dialect,
type));
+
+                    cacheMappings.put(cacheKey, Collections.unmodifiableMap(entryMappings));
+
+                    buildTypeCache(typeMetadata);
+                }
+            }
+            finally {
+                initLock.unlock();
+            }
+        }
     }
 
     /**
@@ -439,10 +471,15 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K,
V> {
      * @throws CacheException if mapping for key was not found.
      */
     private EntryMapping entryMapping(Object keyId, Object key) throws CacheException {
-        EntryMapping em = entryMappings.get(new IgniteBiTuple<>(session().cacheName(),
keyId));
+        String cacheName = session().cacheName();
+
+        init();
+
+        EntryMapping em = cacheMappings.get(cacheKeyId()).get(keyId);
 
         if (em == null)
-            throw new CacheException("Failed to find mapping description for key: " + key);
+            throw new CacheException("Failed to find mapping description for key: " + key
+
+                " in cache: " + (cacheName != null ? cacheName : "<default>"));
 
         return em;
     }
@@ -451,8 +488,6 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K,
V> {
     @Override public void loadCache(final IgniteBiInClosure<K, V> clo, @Nullable Object...
args)
         throws CacheLoaderException {
         try {
-            init();
-
             ExecutorService pool = Executors.newFixedThreadPool(maxPoolSz);
 
             Collection<Future<?>> futs = new ArrayList<>();
@@ -475,10 +510,12 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K,
V> {
                 }
             }
             else {
+                init();
+
                 if (log.isDebugEnabled())
                     log.debug("Start loading all cache types entries from db");
 
-                for (EntryMapping em : entryMappings.values()) {
+                for (EntryMapping em : cacheMappings.get(cacheKeyId()).values()) {
                     if (parallelLoadCacheMinThreshold > 0) {
                         Connection conn = null;
 
@@ -541,8 +578,6 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K,
V> {
     @Nullable @Override public V load(K key) throws CacheLoaderException {
         assert key != null;
 
-        init();
-
         EntryMapping em = entryMapping(keyId(key), key);
 
         if (log.isDebugEnabled())
@@ -578,14 +613,12 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K,
V> {
     @Override public Map<K, V> loadAll(Iterable<? extends K> keys) throws CacheLoaderException
{
         assert keys != null;
 
-        init();
-
         Connection conn = null;
 
         try {
             conn = connection();
 
-            Map<Object, LoadWorker<K, V>> workers = U.newHashMap(entryMappings.size());
+            Map<Object, LoadWorker<K, V>> workers = U.newHashMap(cacheMappings.get(cacheKeyId()).size());
 
             Map<K, V> res = new HashMap<>();
 
@@ -625,8 +658,6 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K,
V> {
     @Override public void write(Cache.Entry<? extends K, ? extends V> entry) throws
CacheWriterException {
         assert entry != null;
 
-        init();
-
         K key = entry.getKey();
 
         EntryMapping em = entryMapping(keyId(key), key);
@@ -685,15 +716,13 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K,
V> {
         throws CacheWriterException {
         assert entries != null;
 
-        init();
-
         Connection conn = null;
 
         try {
             conn = connection();
 
             if (dialect.hasMerge()) {
-                Map<Object, PreparedStatement> stmts = U.newHashMap(entryMappings.size());
+                Map<Object, PreparedStatement> stmts = U.newHashMap(cacheMappings.get(cacheKeyId()).size());
 
                 Object prevKeyId = null;
 
@@ -740,7 +769,8 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K,
V> {
                     U.closeQuiet(st);
             }
             else {
-                Map<Object, T2<PreparedStatement, PreparedStatement>> stmts =
U.newHashMap(entryMappings.size());
+                Map<Object, T2<PreparedStatement, PreparedStatement>> stmts =
+                    U.newHashMap(cacheMappings.get(cacheKeyId()).size());
 
                 for (Cache.Entry<? extends K, ? extends V> entry : entries) {
                     K key = entry.getKey();
@@ -795,8 +825,6 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K,
V> {
     @Override public void delete(Object key) throws CacheWriterException {
         assert key != null;
 
-        init();
-
         EntryMapping em = entryMapping(keyId(key), key);
 
         if (log.isDebugEnabled())
@@ -832,7 +860,7 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K,
V> {
         try {
             conn = connection();
 
-            Map<Object, PreparedStatement> stmts = U.newHashMap(entryMappings.size());
+            Map<Object, PreparedStatement> stmts = U.newHashMap(cacheMappings.get(cacheKeyId()).size());
 
             Object prevKeyId = null;
 
@@ -1004,15 +1032,6 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K,
V> {
     }
 
     /**
-     * Set type mapping description.
-     *
-     * @param typeMetadata Type mapping description.
-     */
-    public void setTypeMetadata(Collection<CacheQueryTypeMetadata> typeMetadata) {
-        this.typeMetadata = typeMetadata;
-    }
-
-    /**
      * Get database dialect.
      *
      * @return Database dialect.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/977f61fa/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java
index 8bc82bc..012dec6 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java
@@ -20,7 +20,6 @@ package org.apache.ignite.cache.store.jdbc;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cache.store.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.*;
@@ -135,9 +134,7 @@ public class JdbcPojoCacheStore extends JdbcCacheStore<Object, Object>
{
     protected Map<String, PojoMethodsCache> mtdsCache;
 
     /** {@inheritDoc} */
-    @Override protected void buildTypeCache() throws CacheException {
-        entryMappings = U.newHashMap(typeMetadata.size());
-
+    @Override protected void buildTypeCache(Collection<CacheQueryTypeMetadata> typeMetadata)
throws CacheException {
         mtdsCache = U.newHashMap(typeMetadata.size() * 2);
 
         for (CacheQueryTypeMetadata type : typeMetadata) {
@@ -145,14 +142,9 @@ public class JdbcPojoCacheStore extends JdbcCacheStore<Object, Object>
{
 
             mtdsCache.put(type.getKeyType(), keyCache);
 
-            entryMappings.put(new IgniteBiTuple<String, Object>(null, keyId(type.getKeyType())),
-                new EntryMapping(dialect, type));
-
             mtdsCache.put(type.getType(), new PojoMethodsCache(type.getType(), type.getValueDescriptors()));
         }
 
-        entryMappings = Collections.unmodifiableMap(entryMappings);
-
         mtdsCache = Collections.unmodifiableMap(mtdsCache);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/977f61fa/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreMultitreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreMultitreadedSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreMultitreadedSelfTest.java
index 0f2ca13..d59e3f7 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreMultitreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreMultitreadedSelfTest.java
@@ -63,10 +63,9 @@ public class PojoCacheStoreMultitreadedSelfTest extends AbstractCacheStoreMultit
 
             springCtx.refresh();
 
-            Collection<CacheQueryTypeMetadata> typeMetadata =
-                springCtx.getBeansOfType(CacheQueryTypeMetadata.class).values();
-
-            store.setTypeMetadata(typeMetadata);
+// TODO IGNITE-32 FIXME
+//            Collection<CacheQueryTypeMetadata> typeMetadata =
+//                springCtx.getBeansOfType(CacheQueryTypeMetadata.class).values();
         }
         catch (BeansException e) {
             if (X.hasCause(e, ClassNotFoundException.class))

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/977f61fa/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreSelfTest.java
index 2eccfa5..534b362 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.cache.store.jdbc;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.jdbc.dialect.*;
 import org.apache.ignite.cache.store.jdbc.model.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -104,7 +105,22 @@ public class PojoCacheStoreSelfTest extends GridCommonAbstractTest {
             Collection<CacheQueryTypeMetadata> typeMetadata =
                 springCtx.getBeansOfType(CacheQueryTypeMetadata.class).values();
 
-            store.setTypeMetadata(typeMetadata);
+            Map<Integer, Map<Object, JdbcCacheStore.EntryMapping>> cacheMappings
= new ConcurrentHashMap<>();
+
+            JdbcDialect dialect = store.resolveDialect();
+
+            GridTestUtils.setFieldValue(store, JdbcCacheStore.class, "dialect", dialect);
+
+            Map<Object, JdbcCacheStore.EntryMapping> entryMappings = U.newHashMap(typeMetadata.size());
+
+            for (CacheQueryTypeMetadata type : typeMetadata)
+                entryMappings.put(store.keyId(type.getKeyType()), new JdbcCacheStore.EntryMapping(dialect,
type));
+
+            store.buildTypeCache(typeMetadata);
+
+            cacheMappings.put(0, Collections.unmodifiableMap(entryMappings));
+
+            GridTestUtils.setFieldValue(store, JdbcCacheStore.class, "cacheMappings", cacheMappings);
         }
         catch (BeansException e) {
             if (X.hasCause(e, ClassNotFoundException.class))

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/977f61fa/modules/schema-load/src/main/java/org/apache/ignite/schema/ui/SchemaLoadApp.java
----------------------------------------------------------------------
diff --git a/modules/schema-load/src/main/java/org/apache/ignite/schema/ui/SchemaLoadApp.java
b/modules/schema-load/src/main/java/org/apache/ignite/schema/ui/SchemaLoadApp.java
index 889a5ef..04b01cf 100644
--- a/modules/schema-load/src/main/java/org/apache/ignite/schema/ui/SchemaLoadApp.java
+++ b/modules/schema-load/src/main/java/org/apache/ignite/schema/ui/SchemaLoadApp.java
@@ -670,10 +670,10 @@ public class SchemaLoadApp extends Application {
         pojosTbl = tableView("Tables not found in database", useCol, keyClsCol, valClsCol);
 
         TableColumn<PojoField, Boolean> useFldCol = booleanColumn("Use", "use",
-            "If checked then this field will used for XML and POJO generation");
+            "Check to use this field for XML and POJO generation");
 
         TableColumn<PojoField, Boolean> keyCol = booleanColumn("Key", "key",
-            "If checked then this field will be part of key object");
+            "–°heck to include this field into key object");
 
         TableColumn<PojoField, Boolean> akCol = booleanColumn("AK", "affinityKey",
             "Check to annotate filed with @CacheAffinityKeyMapped annotation in generated
POJO class\n" +


Mime
View raw message