ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject incubator-ignite git commit: Revert "#ignite-732: IgniteCache.size() should not fail in case of topology changes."
Date Sat, 02 May 2015 07:53:33 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-sprint-4 281f4ef20 -> c4bc92974


Revert "#ignite-732: IgniteCache.size() should not fail in case of topology changes."

This reverts commit 139aa270ae61494c0757867f2dc531ec7251b1da.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c4bc9297
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c4bc9297
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c4bc9297

Branch: refs/heads/ignite-sprint-4
Commit: c4bc92974bace5e4cdb3ac9dc80790193e46d203
Parents: 281f4ef
Author: ivasilinets <ivasilinets@gridgain.com>
Authored: Sat May 2 10:05:35 2015 +0300
Committer: ivasilinets <ivasilinets@gridgain.com>
Committed: Sat May 2 10:05:35 2015 +0300

----------------------------------------------------------------------
 .../ignite/compute/ComputeTaskAdapter.java      |  14 +-
 .../processors/cache/GridCacheAdapter.java      | 503 +++++++++----------
 .../processors/cache/GridCacheProcessor.java    | 109 ++--
 3 files changed, 277 insertions(+), 349 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4bc9297/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
index 87081fc..c2ad198 100644
--- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
@@ -24,16 +24,15 @@ import java.util.*;
 
 /**
  * Convenience adapter for {@link ComputeTask} interface. Here is an example of
- * how {@code ComputeTaskAdapter} can be used:
+ * how {@code GridComputeTaskAdapter} can be used:
  * <pre name="code" class="java">
- * public class MyFooBarTask extends ComputeTaskAdapter&lt;String, String&gt; {
+ * public class MyFooBarTask extends GridComputeTaskAdapter&lt;String, String&gt;
{
  *     // Inject load balancer.
  *     &#64;LoadBalancerResource
  *     ComputeLoadBalancer balancer;
  *
  *     // Map jobs to grid nodes.
- *     public Map&lt;? extends ComputeJob, GridNode&gt; map(List&lt;GridNode&gt;
subgrid, String arg)
- *         throws IgniteCheckedException {
+ *     public Map&lt;? extends ComputeJob, GridNode&gt; map(List&lt;GridNode&gt;
subgrid, String arg) throws IgniteCheckedException {
  *         Map&lt;MyFooBarJob, GridNode&gt; jobs = new HashMap&lt;MyFooBarJob,
GridNode&gt;(subgrid.size());
  *
  *         // In more complex cases, you can actually do
@@ -77,8 +76,8 @@ public abstract class ComputeTaskAdapter<T, R> implements ComputeTask<T,
R> {
      * <p>
      * If remote job resulted in exception ({@link ComputeJobResult#getException()} is not
{@code null}),
      * then {@link ComputeJobResultPolicy#FAILOVER} policy will be returned if the exception
is instance
-     * of {@link org.apache.ignite.cluster.ClusterTopologyException} or {@link ComputeExecutionRejectedException},
-     * which means that remote node either failed or job execution was rejected before it
got a chance to start. In all
+     * of {@link org.apache.ignite.cluster.ClusterTopologyException} or {@link ComputeExecutionRejectedException},
which means that
+     * remote node either failed or job execution was rejected before it got a chance to
start. In all
      * other cases the exception will be rethrown which will ultimately cause task to fail.
      *
      * @param res Received remote grid executable result.
@@ -88,8 +87,7 @@ public abstract class ComputeTaskAdapter<T, R> implements ComputeTask<T,
R> {
      * @throws IgniteException If handling a job result caused an error effectively rejecting
      *      a failover. This exception will be thrown out of {@link ComputeTaskFuture#get()}
method.
      */
-    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult>
rcvd)
-        throws IgniteException {
+    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult>
rcvd) throws IgniteException {
         IgniteException e = res.getException();
 
         // Try to failover if result is failed.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4bc9297/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 39f19b1..3f4e97b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -75,9 +75,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Failed result. */
-    private static final Object FAIL = new Integer(-1);
-
     /** clearLocally() split threshold. */
     public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000;
 
@@ -885,7 +882,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
     /** {@inheritDoc} */
     @Override public Set<Cache.Entry<K, V>> entrySet() {
-        return entrySet((CacheEntryPredicate[])null);
+        return entrySet((CacheEntryPredicate[]) null);
     }
 
     /** {@inheritDoc} */
@@ -900,17 +897,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
     /** {@inheritDoc} */
     @Override public Set<K> keySet() {
-        return keySet((CacheEntryPredicate[])null);
+        return keySet((CacheEntryPredicate[]) null);
     }
 
     /** {@inheritDoc} */
     @Override public Set<K> primaryKeySet() {
-        return primaryKeySet((CacheEntryPredicate[])null);
+        return primaryKeySet((CacheEntryPredicate[]) null);
     }
 
     /** {@inheritDoc} */
     @Override public Collection<V> values() {
-        return values((CacheEntryPredicate[])null);
+        return values((CacheEntryPredicate[]) null);
     }
 
     /**
@@ -1083,31 +1080,36 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
     /** {@inheritDoc} */
     @Override public void clear() throws IgniteCheckedException {
-        clearAll(0, new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion()));
+        // Clear local cache synchronously.
+        clearLocally();
+
+        clearRemotes(0, new GlobalClearAllCallable(name()));
     }
 
     /** {@inheritDoc} */
     @Override public void clear(K key) throws IgniteCheckedException {
-        clearAll(0, new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(),
-            Collections.singleton(key)));
+        // Clear local cache synchronously.
+        clearLocally(key);
+
+        clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key)));
     }
 
     /** {@inheritDoc} */
     @Override public void clearAll(Set<? extends K> keys) throws IgniteCheckedException
{
-        clearAll(0, new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(),
-            keys));
+        // Clear local cache synchronously.
+        clearLocallyAll(keys);
+
+        clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), keys));
     }
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> clearAsync(K key) {
-        return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(),
-            Collections.singleton(key)));
+        return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key)));
     }
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys)
{
-        return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(),
-            keys));
+        return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), keys));
     }
 
     /**
@@ -1116,13 +1118,19 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
      * @param clearCall Global clear callable object.
      * @throws IgniteCheckedException In case of cache could not be cleared on any of the
nodes.
      */
-    private void clearAll(long timeout, TopologyVersionAwareCallable clearCall) throws IgniteCheckedException
{
+    private void clearRemotes(long timeout, GlobalClearCallable clearCall) throws IgniteCheckedException
{
         try {
+            // Send job to remote nodes only.
+            Collection<ClusterNode> nodes =
+                ctx.grid().cluster().forCacheNodes(name(), true, true, false).forRemotes().nodes();
+
             IgniteInternalFuture<Object> fut = null;
 
-            ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout);
+            if (!nodes.isEmpty()) {
+                ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout);
 
-            fut = new ClearFuture(ctx, clearCall);
+                fut = ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true);
+            }
 
             if (fut != null)
                 fut.get();
@@ -1141,18 +1149,19 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> clearAsync() {
-        return clearAsync(new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion()));
+        return clearAsync(new GlobalClearAllCallable(name()));
     }
 
     /**
      * @param clearCall Global clear callable object.
      * @return Future.
      */
-    private IgniteInternalFuture<?> clearAsync(TopologyVersionAwareCallable clearCall)
{
+    private IgniteInternalFuture<?> clearAsync(GlobalClearCallable clearCall) {
         Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name(),
true, true, false).nodes();
 
         if (!nodes.isEmpty()) {
-            IgniteInternalFuture<Object> fut = new ClearFuture(ctx, clearCall);
+            IgniteInternalFuture<Object> fut =
+                ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true);
 
             return fut.chain(new CX1<IgniteInternalFuture<Object>, Object>()
{
                 @Override public Object applyx(IgniteInternalFuture<Object> fut) throws
IgniteCheckedException {
@@ -2108,7 +2117,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
                 throws IgniteCheckedException {
                 Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys,
                     new C1<K, EntryProcessor<K, V, Object>>() {
-                        @Override public EntryProcessor apply(K k) {
+                            @Override public EntryProcessor apply(K k) {
                             return entryProcessor;
                         }
                     });
@@ -2136,7 +2145,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         IgniteInternalFuture<?> fut = asyncOp(new AsyncInOp() {
             @Override public IgniteInternalFuture<GridCacheReturn> inOp(IgniteTxLocalAdapter
tx) {
                 Map<? extends K, EntryProcessor<K, V, Object>> invokeMap =
-                    Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor);
+                    Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor);
 
                 return tx.invokeAsync(ctx, invokeMap, args);
             }
@@ -2362,7 +2371,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
             @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
                 return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray())
-                    .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>,
V>)RET2VAL);
+                    .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>,
V>) RET2VAL);
             }
 
             @Override public String toString() {
@@ -2517,7 +2526,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         return asyncOp(new AsyncOp<Boolean>() {
             @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter
tx) {
                 return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).chain(
-                    (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
+                    (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)
RET2FLAG);
             }
 
             @Override public String toString() {
@@ -2906,7 +2915,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
                 if (ctx.deploymentEnabled())
                     ctx.deploy().registerClass(oldVal);
 
-                return tx.putAllAsync(ctx,
+                return (GridCacheReturn) tx.putAllAsync(ctx,
                         F.t(key, newVal),
                         true,
                         null,
@@ -3008,7 +3017,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
                     ctx.deploy().registerClass(val);
 
                 return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false,
-                    ctx.equalsValArray(val)).get().success();
+                        ctx.equalsValArray(val)).get().success();
             }
 
             @Override public String toString() {
@@ -3221,10 +3230,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         TransactionConfiguration cfg = ctx.gridConfig().getTransactionConfiguration();
 
         return txStart(
-            concurrency,
-            isolation,
-            cfg.getDefaultTxTimeout(),
-            0
+                concurrency,
+                isolation,
+                cfg.getDefaultTxTimeout(),
+                0
         );
     }
 
@@ -3567,7 +3576,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         if (nodes.isEmpty())
             return new GridFinishedFuture<>(0);
 
-        return new SizeFuture(peekModes, ctx, modes.near);
+        IgniteInternalFuture<Collection<Integer>> fut =
+            ctx.closures().broadcastNoFailover(new SizeCallable(ctx.name(), peekModes), null,
nodes);
+
+        return fut.chain(new CX1<IgniteInternalFuture<Collection<Integer>>,
Integer>() {
+            @Override public Integer applyx(IgniteInternalFuture<Collection<Integer>>
fut)
+            throws IgniteCheckedException {
+                Collection<Integer> res = fut.get();
+
+                int totalSize = 0;
+
+                for (Integer size : res)
+                    totalSize += size;
+
+                return totalSize;
+            }
+        });
     }
 
     /** {@inheritDoc} */
@@ -3651,7 +3675,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         return F.iterator(iterator(),
             new IgniteClosure<Cache.Entry<K, V>, Cache.Entry<K, V>>() {
                 private IgniteCacheExpiryPolicy expiryPlc =
-                    ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null);
+                        ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null);
 
                 @Override public Cache.Entry<K, V> apply(Cache.Entry<K, V> lazyEntry)
{
                     CacheOperationContext prev = ctx.gate().enter(opCtx);
@@ -3885,6 +3909,50 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
     }
 
     /**
+     * Gets cache global size (with or without backups).
+     *
+     * @param primaryOnly {@code True} if only primary sizes should be included.
+     * @return Global size.
+     * @throws IgniteCheckedException If internal task execution failed.
+     */
+    private int globalSize(boolean primaryOnly) throws IgniteCheckedException {
+        try {
+            // Send job to remote nodes only.
+            Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name()).forRemotes().nodes();
+
+            IgniteInternalFuture<Collection<Integer>> fut = null;
+
+            if (!nodes.isEmpty()) {
+                ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, gridCfg.getNetworkTimeout());
+
+                fut = ctx.closures().broadcastNoFailover(new GlobalSizeCallable(name(), primaryOnly),
null, nodes);
+            }
+
+            // Get local value.
+            int globalSize = primaryOnly ? primarySize() : size();
+
+            if (fut != null) {
+                for (Integer i : fut.get())
+                    globalSize += i;
+            }
+
+            return globalSize;
+        }
+        catch (ClusterGroupEmptyCheckedException ignore) {
+            if (log.isDebugEnabled())
+                log.debug("All remote nodes left while cache clearLocally [cacheName=" +
name() + "]");
+
+            return primaryOnly ? primarySize() : size();
+        }
+        catch (ComputeTaskTimeoutCheckedException e) {
+            U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider
increasing " +
+                "'networkTimeout' configuration property) [cacheName=" + name() + "]");
+
+            throw e;
+        }
+    }
+
+    /**
      * @param op Cache operation.
      * @param <T> Return type.
      * @return Operation result.
@@ -4825,10 +4893,47 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
     }
 
     /**
+     * Internal callable which performs clear operation on a cache with the given name.
+     */
+    @GridInternal
+    private static abstract class GlobalClearCallable implements Callable<Object>,
Externalizable {
+        /** Cache name. */
+        protected String cacheName;
+
+        /** Injected grid instance. */
+        @IgniteInstanceResource
+        protected Ignite ignite;
+
+        /**
+         * Empty constructor for serialization.
+         */
+        public GlobalClearCallable() {
+            // No-op.
+        }
+
+        /**
+         * @param cacheName Cache name.
+         */
+        protected GlobalClearCallable(String cacheName) {
+            this.cacheName = cacheName;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            U.writeString(out, cacheName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
+            cacheName = U.readString(in);
+        }
+    }
+
+    /**
      * Global clear all.
      */
     @GridInternal
-    private static class GlobalClearAllCallable extends TopologyVersionAwareCallable {
+    private static class GlobalClearAllCallable extends GlobalClearCallable {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -4841,30 +4946,24 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
         /**
          * @param cacheName Cache name.
-         * @param topVer Affinity topology version.
          */
-        private GlobalClearAllCallable(String cacheName, AffinityTopologyVersion topVer)
{
-            super(cacheName, topVer);
+        private GlobalClearAllCallable(String cacheName) {
+            super(cacheName);
         }
 
         /** {@inheritDoc} */
-        @Override protected Object callLocal() {
+        @Override public Object call() throws Exception {
             ((IgniteEx)ignite).cachex(cacheName).clearLocally();
 
             return null;
         }
-
-        /** {@inheritDoc} */
-        @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) {
-            return ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, false).nodes();
-        }
     }
 
     /**
      * Global clear keys.
      */
     @GridInternal
-    private static class GlobalClearKeySetCallable<K, V> extends TopologyVersionAwareCallable
{
+    private static class GlobalClearKeySetCallable<K, V> extends GlobalClearCallable
{
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -4880,25 +4979,33 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
         /**
          * @param cacheName Cache name.
-         * @param topVer Affinity topology version.
          * @param keys Keys to clear.
          */
-        protected GlobalClearKeySetCallable(String cacheName, AffinityTopologyVersion topVer,
Set<? extends K> keys) {
-            super(cacheName, topVer);
+        private GlobalClearKeySetCallable(String cacheName, Set<? extends K> keys)
{
+            super(cacheName);
 
             this.keys = keys;
         }
 
         /** {@inheritDoc} */
-        @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) {
-            return ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, false).nodes();
+        @Override public Object call() throws Exception {
+            ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys);
+
+            return null;
         }
 
         /** {@inheritDoc} */
-        @Override protected Object callLocal() {
-            ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys);
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            super.writeExternal(out);
 
-            return null;
+            out.writeObject(keys);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
+            super.readExternal(in);
+
+            keys = (Set<K>) in.readObject();
         }
     }
 
@@ -4906,202 +5013,127 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
      * Internal callable for global size calculation.
      */
     @GridInternal
-    private static class GlobalSizeCallable extends TopologyVersionAwareCallable {
+    private static class SizeCallable extends IgniteClosureX<Object, Integer> implements
Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
 
+        /** Cache name. */
+        private String cacheName;
+
         /** Peek modes. */
         private CachePeekMode[] peekModes;
 
-        /** Near enable. */
-        private boolean nearEnable;
+        /** Injected grid instance. */
+        @IgniteInstanceResource
+        private Ignite ignite;
 
         /**
          * Required by {@link Externalizable}.
          */
-        public GlobalSizeCallable() {
+        public SizeCallable() {
             // No-op.
         }
 
         /**
          * @param cacheName Cache name.
-         * @param topVer Affinity topology version.
          * @param peekModes Cache peek modes.
          */
-        private GlobalSizeCallable(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[]
peekModes, boolean nearEnable) {
-            super(cacheName, topVer);
-
+        private SizeCallable(String cacheName, CachePeekMode[] peekModes) {
+            this.cacheName = cacheName;
             this.peekModes = peekModes;
-            this.nearEnable = nearEnable;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected Object callLocal() {
-            try {
-                IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName);
-
-                return cache == null ? 0 : cache.localSize(peekModes);
-            }
-            catch (IgniteCheckedException e) {
-                throw U.convertException(e);
-            }
         }
 
         /** {@inheritDoc} */
-        @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) {
-            IgniteClusterEx cluster = ctx.grid().cluster();
+        @Override public Integer applyx(Object o) throws IgniteCheckedException {
+            IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName);
 
-            ClusterGroup grp = nearEnable ? cluster.forCacheNodes(ctx.name(), true, true,
false) : cluster.forDataNodes(ctx.name());
+            assert cache != null : cacheName;
 
-            return grp.nodes();
+            return cache.localSize(peekModes);
         }
 
         /** {@inheritDoc} */
-        public String toString() {
-            return S.toString(GlobalSizeCallable.class, this);
-        }
-    }
-
-    /**
-     * Cache size future.
-     */
-    private static class SizeFuture extends RetryFuture {
-        /** Size. */
-        private int size = 0;
-
-        /**
-         * @param peekModes Peek modes.
-         */
-        public SizeFuture(CachePeekMode[] peekModes, GridCacheContext ctx, boolean near)
{
-            super(ctx, new GlobalSizeCallable(ctx.name(), ctx.affinity().affinityTopologyVersion(),
peekModes, near));
-        }
+        @SuppressWarnings("ForLoopReplaceableByForEach")
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            U.writeString(out, cacheName);
 
-        /** {@inheritDoc} */
-        @Override protected void onInit() {
-            size = 0;
-        }
+            out.writeInt(peekModes.length);
 
-        /** {@inheritDoc} */
-        @Override protected void onLocal(Object localRes) {
-            size += (Integer)localRes;
+            for (int i = 0; i < peekModes.length; i++)
+                U.writeEnum(out, peekModes[i]);
         }
 
         /** {@inheritDoc} */
-        @Override protected void allDone() {
-            onDone(size);
-        }
-    }
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
+            cacheName = U.readString(in);
 
-    /**
-     * Cache clear future.
-     */
-    private static class ClearFuture extends RetryFuture {
-        /**
-         */
-        public ClearFuture(GridCacheContext ctx, TopologyVersionAwareCallable clearCall)
{
-            super(ctx, clearCall);
-        }
+            int len = in.readInt();
 
-        /** {@inheritDoc} */
-        @Override protected void onInit() {
-           // No-op.
-        }
+            peekModes = new CachePeekMode[len];
 
-        /** {@inheritDoc} */
-        @Override protected void onLocal(Object localRes) {
-            // No-op.
+            for (int i = 0; i < len; i++)
+                peekModes[i] = CachePeekMode.fromOrdinal(in.readByte());
         }
 
         /** {@inheritDoc} */
-        @Override protected void allDone() {
-            onDone();
+        public String toString() {
+            return S.toString(SizeCallable.class, this);
         }
     }
 
     /**
-     * Retry future.
+     * Internal callable which performs {@link IgniteInternalCache#size()} or {@link IgniteInternalCache#primarySize()}
+     * operation on a cache with the given name.
      */
-    protected static abstract class RetryFuture<T> extends GridFutureAdapter<T>
{
-        /** Context. */
-        private final GridCacheContext ctx;
+    @GridInternal
+    private static class GlobalSizeCallable implements IgniteClosure<Object, Integer>,
Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
 
-        /** Callable. */
-        private final TopologyVersionAwareCallable call;
+        /** Cache name. */
+        private String cacheName;
 
-        /** Max retries count before issuing an error. */
-        private volatile int retries = 32;
+        /** Primary only flag. */
+        private boolean primaryOnly;
+
+        /** Injected grid instance. */
+        @IgniteInstanceResource
+        private Ignite ignite;
 
         /**
+         * Empty constructor for serialization.
          */
-        public RetryFuture(GridCacheContext ctx, TopologyVersionAwareCallable call) {
-            this.ctx = ctx;
-            this.call = call;
-
-            init();
+        public GlobalSizeCallable() {
+            // No-op.
         }
 
         /**
-         * Init.
+         * @param cacheName Cache name.
+         * @param primaryOnly Primary only flag.
          */
-        private void init() {
-            Collection<ClusterNode> nodes = call.nodes(ctx);
-
-            call.topologyVersion(ctx.affinity().affinityTopologyVersion());
-
-            IgniteInternalFuture<Collection<Object>> fut = ctx.closures().callAsyncNoFailover(BROADCAST,
-                F.asSet((Callable<Object>)call), nodes, true);
-
-            fut.listen(new IgniteInClosure<IgniteInternalFuture<Collection<Object>>>()
{
-                @Override public void apply(IgniteInternalFuture<Collection<Object>>
fut) {
-                    try {
-                        Collection<Object> res = fut.get();
-
-                        onInit();
-
-                        for (Object locRes : res) {
-                            if (locRes == FAIL) {
-                                if (retries-- > 0)
-                                    init();
-                                else {
-                                    onDone(new ClusterTopologyException("Failed to wait topology."));
-
-                                    return;
-                                }
-                            }
+        private GlobalSizeCallable(String cacheName, boolean primaryOnly) {
+            this.cacheName = cacheName;
+            this.primaryOnly = primaryOnly;
+        }
 
-                            onLocal(locRes);
-                        }
+        /** {@inheritDoc} */
+        @Override public Integer apply(Object o) {
+            IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName);
 
-                        allDone();
-                    }
-                    catch (IgniteCheckedException e) {
-                        if (X.hasCause(e, ClusterTopologyException.class)) {
-                            if (retries-- > 0)
-                                init();
-                            else
-                                onDone(e);
-                        }
-                        else
-                            onDone(e);
-                    }
-                }
-            });
+            return primaryOnly ? cache.primarySize() : cache.size();
         }
 
-        /**
-         * Init reducer.
-         */
-        protected abstract void onInit();
-
-        /**
-         * @param localRes Add local result to global result.
-         */
-        protected abstract void onLocal(Object localRes);
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            U.writeString(out, cacheName);
+            out.writeBoolean(primaryOnly);
+        }
 
-        /**
-         * On done.
-         */
-        protected abstract void allDone();
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
+            cacheName = U.readString(in);
+            primaryOnly = in.readBoolean();
+        }
     }
 
     /**
@@ -5665,89 +5697,4 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
             metrics.addPutAndGetTimeNanos(System.nanoTime() - start);
         }
     }
-
-    /**
-     * Delayed callable class.
-     */
-    protected static abstract class TopologyVersionAwareCallable<K, V> implements Serializable,
Callable<Object> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Injected grid instance. */
-        @IgniteInstanceResource
-        protected Ignite ignite;
-
-        /** Affinity topology version. */
-        protected AffinityTopologyVersion topVer;
-
-        /** Cache name. */
-        protected String cacheName;
-
-        /**
-         * Empty constructor for serialization.
-         */
-        public TopologyVersionAwareCallable() {
-            // No-op.
-        }
-
-        /**
-         * @param topVer Affinity topology version.
-         */
-        public TopologyVersionAwareCallable(String cacheName, AffinityTopologyVersion topVer)
{
-            this.cacheName = cacheName;
-            this.topVer = topVer;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Object call() throws Exception {
-            if (!compareTopologyVersions())
-                return FAIL;
-
-            Object res = callLocal();
-
-            if (!compareTopologyVersions())
-                return FAIL;
-            else
-                return res;
-        }
-
-        /**
-         * Call local.
-         *
-         * @return Local result.
-         * @throws IgniteCheckedException If failed.
-         */
-        protected abstract Object callLocal() throws IgniteCheckedException;
-
-        /**
-         * @param ctx Grid cache context.
-         * @return Nodes to call.
-         */
-        protected abstract Collection<ClusterNode> nodes(GridCacheContext ctx);
-
-        /**
-         * Compare topology versions.
-         */
-        public boolean compareTopologyVersions() {
-            GridCacheProcessor cacheProc = ((IgniteKernal) ignite).context().cache();
-
-            GridCacheAdapter<K, V> cacheAdapter = cacheProc.internalCache(cacheName);
-
-            if (cacheAdapter == null)
-                return false;
-
-            final GridCacheContext<K, V> ctx = cacheAdapter.context();
-
-            AffinityTopologyVersion locTopVer = ctx.affinity().affinityTopologyVersion();
-
-            return locTopVer.compareTo(topVer) == 0;
-        }
-
-        /**
-         * @param topVer Affinity topology version.
-         */
-        public void topologyVersion(AffinityTopologyVersion topVer) {
-            this.topVer = topVer;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4bc9297/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 77fa104..c0026ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -124,9 +124,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** Must use JDK marshaller since it is used by discovery to fire custom events. */
     private Marshaller marshaller = new JdkMarshaller();
 
-    /** Count down latch for caches. */
-    private CountDownLatch cacheStartedLatch = new CountDownLatch(1);
-
     /**
      * @param ctx Kernal context.
      */
@@ -660,92 +657,87 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void onKernalStart() throws IgniteCheckedException {
-        try {
-            if (ctx.config().isDaemon())
-                return;
+        if (ctx.config().isDaemon())
+            return;
 
-            ClusterNode locNode = ctx.discovery().localNode();
+        ClusterNode locNode = ctx.discovery().localNode();
 
-            // Init cache plugin managers.
-            final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>();
+        // Init cache plugin managers.
+        final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>();
 
-            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-                CacheConfiguration locCcfg = desc.cacheConfiguration();
+        for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+            CacheConfiguration locCcfg = desc.cacheConfiguration();
 
-                CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg);
+            CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg);
 
-                cache2PluginMgr.put(locCcfg.getName(), pluginMgr);
-            }
+            cache2PluginMgr.put(locCcfg.getName(), pluginMgr);
+        }
 
-            if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
-                for (ClusterNode n : ctx.discovery().remoteNodes()) {
-                    checkTransactionConfiguration(n);
+        if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
+            for (ClusterNode n : ctx.discovery().remoteNodes()) {
+                checkTransactionConfiguration(n);
 
-                    DeploymentMode locDepMode = ctx.config().getDeploymentMode();
-                    DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
+                DeploymentMode locDepMode = ctx.config().getDeploymentMode();
+                DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
 
-                    CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment
mode",
-                        locDepMode, rmtDepMode, true);
+                CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment
mode",
+                    locDepMode, rmtDepMode, true);
 
-                    for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-                        CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id());
+                for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                    CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id());
 
-                        if (rmtCfg != null) {
-                            CacheConfiguration locCfg = desc.cacheConfiguration();
+                    if (rmtCfg != null) {
+                        CacheConfiguration locCfg = desc.cacheConfiguration();
 
-                            checkCache(locCfg, rmtCfg, n);
+                        checkCache(locCfg, rmtCfg, n);
 
-                            // Check plugin cache configurations.
-                            CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName());
+                        // Check plugin cache configurations.
+                        CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName());
 
-                            assert pluginMgr != null : " Map=" + cache2PluginMgr;
+                        assert pluginMgr != null : " Map=" + cache2PluginMgr;
 
-                            pluginMgr.validateRemotes(rmtCfg, n);
-                        }
+                        pluginMgr.validateRemotes(rmtCfg, n);
                     }
                 }
             }
+        }
 
-            // Start dynamic caches received from collect discovery data.
-            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-                boolean started = desc.onStart();
+        // Start dynamic caches received from collect discovery data.
+        for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+            boolean started = desc.onStart();
 
-                assert started : "Failed to change started flag for locally configured cache:
" + desc;
+            assert started : "Failed to change started flag for locally configured cache:
" + desc;
 
-                desc.clearRemoteConfigurations();
+            desc.clearRemoteConfigurations();
 
-                CacheConfiguration ccfg = desc.cacheConfiguration();
+            CacheConfiguration ccfg = desc.cacheConfiguration();
 
-                IgnitePredicate filter = ccfg.getNodeFilter();
+            IgnitePredicate filter = ccfg.getNodeFilter();
 
-                if (filter.apply(locNode)) {
-                    CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
+            if (filter.apply(locNode)) {
+                CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
 
-                    CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName());
+                CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName());
 
-                    assert pluginMgr != null : " Map=" + cache2PluginMgr;
+                assert pluginMgr != null : " Map=" + cache2PluginMgr;
 
-                    GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(),
cacheObjCtx);
+                GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx);
 
-                    ctx.dynamicDeploymentId(desc.deploymentId());
+                ctx.dynamicDeploymentId(desc.deploymentId());
 
-                    sharedCtx.addCacheContext(ctx);
+                sharedCtx.addCacheContext(ctx);
 
-                    GridCacheAdapter cache = ctx.cache();
+                GridCacheAdapter cache = ctx.cache();
 
-                    String name = ccfg.getName();
+                String name = ccfg.getName();
 
-                    caches.put(maskNull(name), cache);
+                caches.put(maskNull(name), cache);
 
-                    startCache(cache);
+                startCache(cache);
 
-                    jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null,
false));
-                }
+                jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null,
false));
             }
         }
-        finally {
-            cacheStartedLatch.countDown();
-        }
 
         ctx.marshallerContext().onMarshallerCacheStarted(ctx);
 
@@ -843,8 +835,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void onKernalStop(boolean cancel) {
-        cacheStartedLatch.countDown();
-
         if (ctx.config().isDaemon())
             return;
 
@@ -2696,13 +2686,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (log.isDebugEnabled())
             log.debug("Getting internal cache adapter: " + name);
 
-        try {
-            cacheStartedLatch.await();
-        }
-        catch (InterruptedException e) {
-            throw new IgniteException("Failed to wait starting caches.");
-        }
-
         return (GridCacheAdapter<K, V>)caches.get(maskNull(name));
     }
 


Mime
View raw message