ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [1/2] incubator-ignite git commit: IGNITE-32 Fixed access to store session from different threads.
Date Fri, 13 Feb 2015 07:05:55 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/sprint-1 322408f01 -> 4563cce86


IGNITE-32 Fixed access to store session from different threads.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e9d31150
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e9d31150
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e9d31150

Branch: refs/heads/sprint-1
Commit: e9d311504f76a28fdf7ccc1781ff697e48d1555d
Parents: 43a7391
Author: AKuznetsov <akuznetsov@gridgain.com>
Authored: Fri Feb 13 14:05:18 2015 +0700
Committer: AKuznetsov <akuznetsov@gridgain.com>
Committed: Fri Feb 13 14:05:18 2015 +0700

----------------------------------------------------------------------
 .../store/jdbc/CacheAbstractJdbcStore.java      | 100 +++++++++++--------
 .../cache/store/jdbc/CacheJdbcPojoStore.java    |   8 +-
 .../store/jdbc/CacheJdbcPojoStoreTest.java      |   8 +-
 3 files changed, 67 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9d31150/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index ab21fe1..c0350eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -120,25 +120,27 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K,
V> impl
     /**
      * Get field value from object.
      *
+     * @param cacheName Cache name.
      * @param typeName Type name.
      * @param fieldName Field name.
      * @param obj Cache object.
      * @return Field value from object.
      */
-    @Nullable protected abstract Object extractField(String typeName, String fieldName, Object
obj)
+    @Nullable protected abstract Object extractField(String cacheName, String typeName, String
fieldName, Object obj)
         throws CacheException;
 
     /**
      * Construct object from query result.
      *
      * @param <R> Type of result object.
+     * @param cacheName Cache name.
      * @param typeName Type name.
      * @param fields Fields descriptors.
      * @param loadColIdxs Select query columns index.
      * @param rs ResultSet.
      * @return Constructed object.
      */
-    protected abstract <R> R buildObject(String typeName, Collection<CacheTypeFieldMetadata>
fields,
+    protected abstract <R> R buildObject(String cacheName, String typeName, Collection<CacheTypeFieldMetadata>
fields,
         Map<String, Integer> loadColIdxs, ResultSet rs) throws CacheLoaderException;
 
     /**
@@ -417,8 +419,8 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K,
V> impl
                     ResultSet rs = stmt.executeQuery();
 
                     while (rs.next()) {
-                        K key = buildObject(em.keyType(), em.keyColumns(), em.loadColIdxs,
rs);
-                        V val = buildObject(em.valueType(), em.valueColumns(), em.loadColIdxs,
rs);
+                        K key = buildObject(em.cacheName, em.keyType(), em.keyColumns(),
em.loadColIdxs, rs);
+                        V val = buildObject(em.cacheName, em.valueType(), em.valueColumns(),
em.loadColIdxs, rs);
 
                         clo.apply(key, val);
                     }
@@ -466,13 +468,13 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K,
V> impl
             if (entryMappings != null)
                 return entryMappings;
 
-            Collection<CacheTypeMetadata> types = ignite().cache(session().cacheName()).configuration()
+            Collection<CacheTypeMetadata> types = ignite().cache(cacheName).configuration()
                 .getTypeMetadata();
 
             entryMappings = U.newHashMap(types.size());
 
             for (CacheTypeMetadata type : types)
-                entryMappings.put(keyTypeId(type.getKeyType()), new EntryMapping(dialect,
type));
+                entryMappings.put(keyTypeId(type.getKeyType()), new EntryMapping(cacheName,
dialect, type));
 
             Map<String, Map<Object, EntryMapping>> mappings = new HashMap<>(cacheMappings);
 
@@ -490,14 +492,13 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K,
V> impl
     }
 
     /**
+     * @param cacheName Cache name.
      * @param keyTypeId Key type id.
      * @param key Key object.
      * @return Entry mapping.
      * @throws CacheException if mapping for key was not found.
      */
-    private EntryMapping entryMapping(Object keyTypeId, Object key) throws CacheException
{
-        String cacheName = session().cacheName();
-
+    private EntryMapping entryMapping(String cacheName, Object keyTypeId, Object key) throws
CacheException {
         EntryMapping em = cacheMappings(cacheName).get(keyTypeId);
 
         if (em == null)
@@ -522,12 +523,14 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K,
V> impl
                 if (log.isDebugEnabled())
                     log.debug("Start loading entries from db using user queries from arguments");
 
+                String cacheName = session().cacheName();
+
                 for (int i = 0; i < args.length; i += 2) {
                     String keyType = args[i].toString();
 
                     String selQry = args[i + 1].toString();
 
-                    EntryMapping em = entryMapping(keyTypeId(keyType), keyType);
+                    EntryMapping em = entryMapping(cacheName, keyTypeId(keyType), keyType);
 
                     futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, selQry,
clo)));
                 }
@@ -601,7 +604,7 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K,
V> impl
     @Nullable @Override public V load(K key) throws CacheLoaderException {
         assert key != null;
 
-        EntryMapping em = entryMapping(keyTypeId(key), key);
+        EntryMapping em = entryMapping(session().cacheName(), keyTypeId(key), key);
 
         if (log.isDebugEnabled())
             log.debug("Start load value from database [table= " + em.fullTableName() + ",
key=" + key + "]");
@@ -620,7 +623,7 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K,
V> impl
             ResultSet rs = stmt.executeQuery();
 
             if (rs.next())
-                return buildObject(em.valueType(), em.valueColumns(), em.loadColIdxs, rs);
+                return buildObject(em.cacheName, em.valueType(), em.valueColumns(), em.loadColIdxs,
rs);
         }
         catch (SQLException e) {
             throw new CacheLoaderException("Failed to load object [table=" + em.fullTableName()
+
@@ -642,14 +645,16 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K,
V> impl
         try {
             conn = connection();
 
-            Map<Object, LoadWorker<K, V>> workers = U.newHashMap(cacheMappings(session().cacheName()).size());
+            String cacheName = session().cacheName();
+
+            Map<Object, LoadWorker<K, V>> workers = U.newHashMap(cacheMappings(cacheName).size());
 
             Map<K, V> res = new HashMap<>();
 
             for (K key : keys) {
                 Object keyTypeId = keyTypeId(key);
 
-                EntryMapping em = entryMapping(keyTypeId, key);
+                EntryMapping em = entryMapping(cacheName, keyTypeId, key);
 
                 LoadWorker<K, V> worker = workers.get(keyTypeId);
 
@@ -755,7 +760,7 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K,
V> impl
 
         K key = entry.getKey();
 
-        EntryMapping em = entryMapping(keyTypeId(key), key);
+        EntryMapping em = entryMapping(session().cacheName(), keyTypeId(key), key);
 
         if (log.isDebugEnabled())
             log.debug("Start write entry to database [table=" + em.fullTableName() + ", entry="
+ entry + "]");
@@ -825,6 +830,8 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K,
V> impl
 
             Object currKeyTypeId = null;
 
+            String cacheName = session().cacheName();
+
             if (dialect.hasMerge()) {
                 PreparedStatement mergeStmt = null;
 
@@ -844,7 +851,7 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K,
V> impl
 
                         Object keyTypeId = keyTypeId(key);
 
-                        em = entryMapping(keyTypeId, key);
+                        em = entryMapping(cacheName, keyTypeId, key);
 
                         if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) {
                             if (mergeStmt != null) {
@@ -891,7 +898,7 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K,
V> impl
 
                         Object keyTypeId = keyTypeId(key);
 
-                        EntryMapping em = entryMapping(keyTypeId, key);
+                        EntryMapping em = entryMapping(cacheName, keyTypeId, key);
 
                         if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) {
                             U.closeQuiet(insStmt);
@@ -927,7 +934,7 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K,
V> impl
     @Override public void delete(Object key) throws CacheWriterException {
         assert key != null;
 
-        EntryMapping em = entryMapping(keyTypeId(key), key);
+        EntryMapping em = entryMapping(session().cacheName(), keyTypeId(key), key);
 
         if (log.isDebugEnabled())
             log.debug("Start remove value from database [table=" + em.fullTableName() + ",
key=" + key + "]");
@@ -1027,10 +1034,12 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K,
V> impl
 
             int fromIdx = 0, prepared = 0;
 
+            String cachName = session().cacheName();
+
             for (Object key : keys) {
                 Object keyTypeId = keyTypeId(key);
 
-                em = entryMapping(keyTypeId, key);
+                em = entryMapping(cachName, keyTypeId, key);
 
                 if (delStmt == null) {
                     delStmt = conn.prepareStatement(em.remQry);
@@ -1082,7 +1091,7 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K,
V> impl
     protected int fillKeyParameters(PreparedStatement stmt, int i, EntryMapping em,
         Object key) throws CacheException {
         for (CacheTypeFieldMetadata field : em.keyColumns()) {
-            Object fieldVal = extractField(em.keyType(), field.getJavaName(), key);
+            Object fieldVal = extractField(em.cacheName, em.keyType(), field.getJavaName(),
key);
 
             try {
                 if (fieldVal != null)
@@ -1110,28 +1119,28 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K,
V> impl
 
     /**
      * @param stmt Prepare statement.
-     * @param i Start index for parameters.
-     * @param m Type mapping description.
+     * @param idx Start index for parameters.
+     * @param em Type mapping description.
      * @param val Value object.
      * @return Next index for parameters.
      */
-    protected int fillValueParameters(PreparedStatement stmt, int i, EntryMapping m, Object
val)
+    protected int fillValueParameters(PreparedStatement stmt, int idx, EntryMapping em, Object
val)
         throws CacheWriterException {
-        for (CacheTypeFieldMetadata field : m.uniqValFields) {
-            Object fieldVal = extractField(m.valueType(), field.getJavaName(), val);
+        for (CacheTypeFieldMetadata field : em.uniqValFields) {
+            Object fieldVal = extractField(em.cacheName, em.valueType(), field.getJavaName(),
val);
 
             try {
                 if (fieldVal != null)
-                    stmt.setObject(i++, fieldVal);
+                    stmt.setObject(idx++, fieldVal);
                 else
-                    stmt.setNull(i++, field.getDatabaseType());
+                    stmt.setNull(idx++, field.getDatabaseType());
             }
             catch (SQLException e) {
                 throw new CacheWriterException("Failed to set statement parameter name: "
+ field.getDatabaseName(), e);
             }
         }
 
-        return i;
+        return idx;
     }
 
     /**
@@ -1224,35 +1233,38 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K,
V> impl
      * Entry mapping description.
      */
     protected static class EntryMapping {
+        /** Cache name. */
+        private final String cacheName;
+
         /** Database dialect. */
         private final JdbcDialect dialect;
 
         /** Select border for range queries. */
-        protected final String loadCacheSelRangeQry;
+        private final String loadCacheSelRangeQry;
 
         /** Select all items query. */
-        protected final String loadCacheQry;
+        private final String loadCacheQry;
 
         /** Select item query. */
-        protected final String loadQrySingle;
+        private final String loadQrySingle;
 
         /** Select items query. */
         private final String loadQry;
 
         /** Merge item(s) query. */
-        protected final String mergeQry;
+        private final String mergeQry;
 
         /** Update item query. */
-        protected final String insQry;
+        private final String insQry;
 
         /** Update item query. */
-        protected final String updQry;
+        private final String updQry;
 
         /** Remove item(s) query. */
-        protected final String remQry;
+        private final String remQry;
 
         /** Max key count for load query per statement. */
-        protected final int maxKeysPerStmt;
+        private final int maxKeysPerStmt;
 
         /** Database key columns. */
         private final Collection<String> keyCols;
@@ -1273,9 +1285,13 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K,
V> impl
         private final String fullTblName;
 
         /**
+         * @param cacheName Cache name.
+         * @param dialect JDBC dialect.
          * @param typeMeta Type metadata.
          */
-        public EntryMapping(JdbcDialect dialect, CacheTypeMetadata typeMeta) {
+        public EntryMapping(String cacheName, JdbcDialect dialect, CacheTypeMetadata typeMeta)
{
+            this.cacheName = cacheName;
+
             this.dialect = dialect;
 
             this.typeMeta = typeMeta;
@@ -1456,8 +1472,8 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K,
V> impl
                     colIdxs.put(meta.getColumnLabel(i), i);
 
                 while (rs.next()) {
-                    K1 key = buildObject(em.keyType(), em.keyColumns(), colIdxs, rs);
-                    V1 val = buildObject(em.valueType(), em.valueColumns(), colIdxs, rs);
+                    K1 key = buildObject(em.cacheName, em.keyType(), em.keyColumns(), colIdxs,
rs);
+                    V1 val = buildObject(em.cacheName, em.valueType(), em.valueColumns(),
colIdxs, rs);
 
                     clo.apply(key, val);
                 }
@@ -1538,7 +1554,7 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K,
V> impl
 
                 for (Object key : keys)
                     for (CacheTypeFieldMetadata field : em.keyColumns()) {
-                        Object fieldVal = extractField(em.keyType(), field.getJavaName(),
key);
+                        Object fieldVal = extractField(em.cacheName, em.keyType(), field.getJavaName(),
key);
 
                         if (fieldVal != null)
                             stmt.setObject(i++, fieldVal);
@@ -1551,8 +1567,8 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K,
V> impl
                 Map<K1, V1> entries = U.newHashMap(keys.size());
 
                 while (rs.next()) {
-                    K1 key = buildObject(em.keyType(), em.keyColumns(), em.loadColIdxs, rs);
-                    V1 val = buildObject(em.valueType(), em.valueColumns(), em.loadColIdxs,
rs);
+                    K1 key = buildObject(em.cacheName, em.keyType(), em.keyColumns(), em.loadColIdxs,
rs);
+                    V1 val = buildObject(em.cacheName, em.valueType(), em.valueColumns(),
em.loadColIdxs, rs);
 
                     entries.put(key, val);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9d31150/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
index ed1846b..8687d90 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
@@ -153,9 +153,9 @@ public class CacheJdbcPojoStore extends CacheAbstractJdbcStore<Object,
Object> {
     }
 
     /** {@inheritDoc} */
-    @Override protected <R> R buildObject(String typeName, Collection<CacheTypeFieldMetadata>
fields,
+    @Override protected <R> R buildObject(String cacheName, String typeName, Collection<CacheTypeFieldMetadata>
fields,
         Map<String, Integer> loadColIdxs, ResultSet rs) throws CacheLoaderException
{
-        PojoMethodsCache mc = mtdsCache.get(session().cacheName()).get(typeName);
+        PojoMethodsCache mc = mtdsCache.get(cacheName).get(typeName);
 
         Object obj = mc.newInstance();
 
@@ -176,10 +176,10 @@ public class CacheJdbcPojoStore extends CacheAbstractJdbcStore<Object,
Object> {
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override protected Object extractField(String typeName, String fieldName,
Object obj)
+    @Nullable @Override protected Object extractField(String cacheName, String typeName,
String fieldName, Object obj)
         throws CacheException {
         try {
-            PojoMethodsCache mc = mtdsCache.get(session().cacheName()).get(typeName);
+            PojoMethodsCache mc = mtdsCache.get(cacheName).get(typeName);
 
             return mc.getters.get(fieldName).invoke(obj);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9d31150/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
index 1b665f7..f7dc4e1 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
@@ -51,7 +51,7 @@ import static org.apache.ignite.testframework.junits.cache.GridAbstractCacheStor
  * Class for {@code PojoCacheStore} tests.
  */
 public class CacheJdbcPojoStoreTest extends GridCommonAbstractTest {
-    /** Default connection URL (value is <tt>jdbc:h2:mem:jdbcCacheStore;DB_CLOSE_DELAY=-1</tt>).
*/
+    /** DB connection URL. */
     private static final String DFLT_CONN_URL = "jdbc:h2:mem:autoCacheStore;DB_CLOSE_DELAY=-1";
 
     /** Default config with mapping. */
@@ -143,7 +143,8 @@ public class CacheJdbcPojoStoreTest extends GridCommonAbstractTest {
             Map<Object, CacheAbstractJdbcStore.EntryMapping> entryMappings = U.newHashMap(typeMeta.size());
 
             for (CacheTypeMetadata type : typeMeta)
-                entryMappings.put(store.keyTypeId(type.getKeyType()), new CacheAbstractJdbcStore.EntryMapping(dialect,
type));
+                entryMappings.put(store.keyTypeId(type.getKeyType()),
+                    new CacheAbstractJdbcStore.EntryMapping(null, dialect, type));
 
             store.prepareBuilders(null, typeMeta);
 
@@ -183,7 +184,8 @@ public class CacheJdbcPojoStoreTest extends GridCommonAbstractTest {
 
         CacheTypeMetadata typeMeta = GridTestUtils.getFieldValue(em, CacheAbstractJdbcStore.EntryMapping.class,
"typeMeta");
 
-        cacheMappings.get(null).put(OrganizationKey.class, new CacheAbstractJdbcStore.EntryMapping(dialect,
typeMeta));
+        cacheMappings.get(null).put(OrganizationKey.class,
+            new CacheAbstractJdbcStore.EntryMapping(null, dialect, typeMeta));
 
         Connection conn = store.openConnection(false);
 


Mime
View raw message