ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sevdoki...@apache.org
Subject [04/15] incubator-ignite git commit: # Bug fix: GridCacheProcessor.publicCache() doesn't return dynamic created caches. Also GridCacheProcessor was simplified.
Date Fri, 13 Mar 2015 09:52:11 GMT
# Bug fix: GridCacheProcessor.publicCache() doesn't return dynamic created caches. Also GridCacheProcessor
was simplified.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f08510a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f08510a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f08510a7

Branch: refs/heads/ignite-45
Commit: f08510a7c5893141d33199922e1a5f7d0154b43c
Parents: 63817ab
Author: sevdokimov <sevdokimov@gridgain.com>
Authored: Thu Mar 12 14:29:22 2015 +0300
Committer: sevdokimov <sevdokimov@gridgain.com>
Committed: Thu Mar 12 14:29:22 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    | 53 +++++------
 .../processors/cache/IgniteCacheProxy.java      | 94 ++++++++++----------
 2 files changed, 75 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f08510a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 758a4f8..0dd69cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -85,13 +85,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     private final Map<String, GridCacheAdapter<?, ?>> caches;
 
     /** Map of proxies. */
-    private final Map<String, GridCache<?, ?>> proxies;
-
-    /** Map of proxies. */
     private final Map<String, IgniteCacheProxy<?, ?>> jCacheProxies;
 
     /** Map of public proxies, i.e. proxies which could be returned to the user. */
-    private final Map<String, GridCache<?, ?>> publicProxies;
+    private volatile List<GridCache<?, ?>> publicProxies;
 
     /** Map of preload finish futures grouped by preload order. */
     private final NavigableMap<Integer, IgniteInternalFuture<?>> preloadFuts;
@@ -127,8 +124,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         super(ctx);
 
         caches = new LinkedHashMap<>();
-        proxies = new HashMap<>();
-        publicProxies = new HashMap<>();
         jCacheProxies = new HashMap<>();
         preloadFuts = new TreeMap<>();
 
@@ -639,8 +634,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet())
{
             GridCacheAdapter cache = e.getValue();
 
-            proxies.put(e.getKey(), new GridCacheProxyImpl(cache.context(), cache, null));
-
             jCacheProxies.put(e.getKey(), new IgniteCacheProxy(cache.context(), cache, null,
false));
         }
 
@@ -696,8 +689,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                 startCache(cache);
 
-                proxies.put(name, new GridCacheProxyImpl(ctx, cache, null));
-
                 jCacheProxies.put(name, new IgniteCacheProxy(ctx, cache, null, false));
             }
         }
@@ -708,14 +699,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             }
         });
 
-        // Internal caches which should not be returned to user.
-        for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet())
{
-            GridCacheAdapter cache = e.getValue();
-
-            if (!sysCaches.contains(e.getKey()))
-                publicProxies.put(e.getKey(), new GridCacheProxyImpl(cache.context(), cache,
null));
-        }
-
         // Must call onKernalStart on shared managers after creation of fetched caches.
         for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
             mgr.onKernalStart();
@@ -769,9 +752,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (ctx.config().isDaemon())
             return;
 
-        for (String cacheName : stopSeq) {
+        for (String cacheName : stopSeq)
             stopCache(caches.get(cacheName), cancel);
-        }
 
         List<? extends GridCacheSharedManager<?, ?>> mgrs = sharedCtx.managers();
 
@@ -1991,14 +1973,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (log.isDebugEnabled())
             log.debug("Getting cache for name: " + name);
 
-        return (GridCache<K, V>)proxies.get(name);
+        IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, V>)jCacheProxies.get(name);
+
+        return jcache == null ? null : jcache.legacyProxy();
     }
 
     /**
      * @return All configured cache instances.
      */
     public Collection<GridCache<?, ?>> caches() {
-        return proxies.values();
+        return F.viewReadOnly(jCacheProxies.values(), new IgniteClosure<IgniteCacheProxy<?,
?>, GridCache<?, ?>>() {
+            @Override public GridCache<?, ?> apply(IgniteCacheProxy<?, ?> entries)
{
+                return entries.legacyProxy();
+            }
+        });
     }
 
     /**
@@ -2060,12 +2048,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (sysCaches.contains(name))
             throw new IllegalStateException("Failed to get cache because it is system cache:
" + name);
 
-        GridCache<K, V> cache = (GridCache<K, V>)publicProxies.get(name);
+        IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, V>)jCacheProxies.get(name);
 
-        if (cache == null)
+        if (jcache == null)
             throw new IllegalArgumentException("Cache is not configured: " + name);
 
-        return cache;
+        return jcache.legacyProxy();
     }
 
     /**
@@ -2129,7 +2117,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @return All configured public cache instances.
      */
     public Collection<GridCache<?, ?>> publicCaches() {
-        return publicProxies.values();
+        List<GridCache<?, ?>> res = publicProxies;
+
+        if (res == null) {
+            res = new ArrayList<>(jCacheProxies.size());
+
+            for (IgniteCacheProxy<?, ?> proxy : jCacheProxies.values()) {
+                if (!sysCaches.contains(proxy.getName()))
+                    res.add(proxy.legacyProxy());
+            }
+
+            publicProxies = res;
+        }
+
+        return res;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f08510a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 45093cc..17af9e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -63,6 +63,10 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
     /** Projection. */
     private GridCacheProjectionImpl<K, V> prj;
 
+    /** */
+    @GridToStringExclude
+    private GridCacheProxyImpl<K, V> legacyProxy;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -92,6 +96,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
         this.prj = prj;
 
         gate = ctx.gate();
+
+        legacyProxy = new GridCacheProxyImpl<K, V>(ctx, delegate, prj);
     }
 
     /**
@@ -258,8 +264,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
         }
     }
 
-    private IgniteBiPredicate<K,V> acceptAll() {
-        return new IgniteBiPredicate<K,V>() {
+    private IgniteBiPredicate<K, V> acceptAll() {
+        return new IgniteBiPredicate<K, V>() {
             @Override public boolean apply(K k, V v) {
                 return true;
             }
@@ -272,12 +278,12 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
      * @return Cursor.
      */
     @SuppressWarnings("unchecked")
-    private QueryCursor<Entry<K,V>> doQuery(Query filter, @Nullable ClusterGroup
grp) {
-        final CacheQuery<Map.Entry<K,V>> qry;
-        final CacheQueryFuture<Map.Entry<K,V>> fut;
+    private QueryCursor<Entry<K, V>> doQuery(Query filter, @Nullable ClusterGroup
grp) {
+        final CacheQuery<Map.Entry<K, V>> qry;
+        final CacheQueryFuture<Map.Entry<K, V>> fut;
 
         if (filter instanceof ScanQuery) {
-            IgniteBiPredicate<K,V> p = ((ScanQuery)filter).getFilter();
+            IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter();
 
             qry = delegate.queries().createScanQuery(p != null ? p : acceptAll());
 
@@ -323,8 +329,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
         else
             throw new IgniteException("Unsupported query predicate: " + filter);
 
-        return new QueryCursorImpl<>(new ClIter<Map.Entry<K,V>,Cache.Entry<K,V>>(fut)
{
-            @Override protected Cache.Entry<K,V> convert(Map.Entry<K,V> e) {
+        return new QueryCursorImpl<>(new ClIter<Map.Entry<K, V>, Cache.Entry<K,
V>>(fut) {
+            @Override protected Cache.Entry<K, V> convert(Map.Entry<K, V> e)
{
                 return new CacheEntryImpl<>(e.getKey(), e.getValue());
             }
         });
@@ -337,7 +343,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
      * @param loc Local flag.
      * @return Initial iteration cursor.
      */
-    private QueryCursor<Entry<K,V>> queryContinuous(ContinuousQuery<K, V>
qry, boolean loc) {
+    private QueryCursor<Entry<K, V>> queryContinuous(ContinuousQuery<K, V>
qry, boolean loc) {
         if (qry.getInitialQuery() instanceof ContinuousQuery)
             throw new IgniteException("Initial predicate for continuous query can't be an
instance of another " +
                 "continuous query. Use SCAN or SQL query for initial iteration.");
@@ -398,7 +404,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public QueryCursor<Entry<K,V>> query(Query qry) {
+    @Override public QueryCursor<Entry<K, V>> query(Query qry) {
         A.notNull(qry, "qry");
 
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
@@ -431,7 +437,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
         try {
             validate(qry);
 
-            CacheQuery<List<?>> q = ((GridCacheQueriesEx<K,V>)delegate.queries()).createSqlFieldsQuery(qry.getSql(),
false);
+            CacheQuery<List<?>> q = ((GridCacheQueriesEx<K, V>)delegate.queries()).createSqlFieldsQuery(qry.getSql(),
false);
 
             if (qry.getPageSize() > 0)
                 q.pageSize(qry.getPageSize());
@@ -459,8 +465,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
      * @param p Query.
      * @return Cursor.
      */
-    private QueryCursor<Entry<K,V>> doLocalQuery(SqlQuery p) {
-        return new QueryCursorImpl<>(ctx.kernalContext().query().<K,V>queryLocal(
+    private QueryCursor<Entry<K, V>> doLocalQuery(SqlQuery p) {
+        return new QueryCursorImpl<>(ctx.kernalContext().query().<K, V>queryLocal(
             ctx.name(), p.getType(), p.getSql(), p.getArgs()));
     }
 
@@ -486,7 +492,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public QueryCursor<Entry<K,V>> localQuery(Query qry) {
+    @Override public QueryCursor<Entry<K, V>> localQuery(Query qry) {
         A.notNull(qry, "qry");
 
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
@@ -498,7 +504,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
                 return doLocalQuery((SqlQuery)qry);
 
             if (qry instanceof ContinuousQuery)
-                return queryContinuous((ContinuousQuery<K,V>)qry, true);
+                return queryContinuous((ContinuousQuery<K, V>)qry, true);
 
             return doQuery(qry, projection(true));
         }
@@ -1144,8 +1150,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
 
     /** {@inheritDoc} */
     @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<?
extends K> keys,
-        EntryProcessor<K, V, T> entryProcessor,
-        Object... args) {
+                                                                   EntryProcessor<K, V,
T> entryProcessor,
+                                                                   Object... args) {
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -1169,8 +1175,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
 
     /** {@inheritDoc} */
     @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<?
extends K> keys,
-        IgniteEntryProcessor<K, V, T> entryProcessor,
-        Object... args) {
+                                                                   IgniteEntryProcessor<K,
V, T> entryProcessor,
+                                                                   Object... args) {
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -1317,39 +1323,25 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
     }
 
     /**
-     * Creates projection that will operate with portable objects.
-     * <p>
-     * Projection returned by this method will force cache not to deserialize portable objects,
-     * so keys and values will be returned from cache API methods without changes. Therefore,
-     * signature of the projection can contain only following types:
-     * <ul>
-     *     <li>{@code PortableObject} for portable classes</li>
-     *     <li>All primitives (byte, int, ...) and there boxed versions (Byte, Integer,
...)</li>
-     *     <li>Arrays of primitives (byte[], int[], ...)</li>
-     *     <li>{@link String} and array of {@link String}s</li>
-     *     <li>{@link UUID} and array of {@link UUID}s</li>
-     *     <li>{@link Date} and array of {@link Date}s</li>
-     *     <li>{@link java.sql.Timestamp} and array of {@link java.sql.Timestamp}s</li>
-     *     <li>Enums and array of enums</li>
-     *     <li>
-     *         Maps, collections and array of objects (but objects inside
-     *         them will still be converted if they are portable)
-     *     </li>
-     * </ul>
-     * <p>
-     * For example, if you use {@link Integer} as a key and {@code Value} class as a value
-     * (which will be stored in portable format), you should acquire following projection
-     * to avoid deserialization:
+     * Creates projection that will operate with portable objects. <p> Projection returned
by this method will force
+     * cache not to deserialize portable objects, so keys and values will be returned from
cache API methods without
+     * changes. Therefore, signature of the projection can contain only following types:
<ul> <li>{@code PortableObject}
+     * for portable classes</li> <li>All primitives (byte, int, ...) and there
boxed versions (Byte, Integer, ...)</li>
+     * <li>Arrays of primitives (byte[], int[], ...)</li> <li>{@link String}
and array of {@link String}s</li>
+     * <li>{@link UUID} and array of {@link UUID}s</li> <li>{@link Date}
and array of {@link Date}s</li> <li>{@link
+     * java.sql.Timestamp} and array of {@link java.sql.Timestamp}s</li> <li>Enums
and array of enums</li> <li> Maps,
+     * collections and array of objects (but objects inside them will still be converted
if they are portable) </li>
+     * </ul> <p> For example, if you use {@link Integer} as a key and {@code
Value} class as a value (which will be
+     * stored in portable format), you should acquire following projection to avoid deserialization:
      * <pre>
      * CacheProjection<Integer, GridPortableObject> prj = cache.keepPortable();
      *
      * // Value is not deserialized and returned in portable format.
      * GridPortableObject po = prj.get(1);
      * </pre>
-     * <p>
-     * Note that this method makes sense only if cache is working in portable mode
-     * ({@code CacheConfiguration#isPortableEnabled()} returns {@code true}. If not,
-     * this method is no-op and will return current projection.
+     * <p> Note that this method makes sense only if cache is working in portable mode
({@code
+     * CacheConfiguration#isPortableEnabled()} returns {@code true}. If not, this method
is no-op and will return
+     * current projection.
      *
      * @return Projection for portable objects.
      */
@@ -1433,6 +1425,14 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
         curFut.set(new IgniteFutureImpl<>(fut));
     }
 
+    /**
+     * @return Legacy proxy.
+     */
+    @NotNull
+    public GridCacheProxyImpl<K, V> legacyProxy() {
+        return legacyProxy;
+    }
+
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeObject(ctx);
@@ -1452,6 +1452,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
         prj = (GridCacheProjectionImpl<K, V>)in.readObject();
 
         gate = ctx.gate();
+
+        legacyProxy = new GridCacheProxyImpl<K, V>(ctx, delegate, prj);
     }
 
     /** {@inheritDoc} */


Mime
View raw message