ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject incubator-ignite git commit: ignite-732 IgniteCache.size() should not fail in case of topology changes
Date Wed, 29 Apr 2015 22:46:52 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-732 e50ba9efe -> b7e7f239c (forced update)


ignite-732 IgniteCache.size() should not fail in case of topology changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b7e7f239
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b7e7f239
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b7e7f239

Branch: refs/heads/ignite-732
Commit: b7e7f239c4d7090db047a583cfab9bf50acadf45
Parents: dfca76b
Author: agura <agura@gridgain.com>
Authored: Wed Apr 29 22:20:57 2015 +0300
Committer: agura <agura@gridgain.com>
Committed: Thu Apr 30 01:45:54 2015 +0300

----------------------------------------------------------------------
 .../ignite/compute/ComputeTaskAdapter.java      |  14 +-
 .../processors/cache/GridCacheAdapter.java      | 173 +++++++------------
 .../processors/task/GridTaskWorker.java         |   3 +-
 .../cache/GridCacheSizeTopologyChangedTest.java | 149 ++++++++++++++++
 4 files changed, 218 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b7e7f239/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
index c2ad198..87081fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
@@ -24,15 +24,16 @@ import java.util.*;
 
 /**
  * Convenience adapter for {@link ComputeTask} interface. Here is an example of
- * how {@code GridComputeTaskAdapter} can be used:
+ * how {@code ComputeTaskAdapter} can be used:
  * <pre name="code" class="java">
- * public class MyFooBarTask extends GridComputeTaskAdapter&lt;String, String&gt;
{
+ * public class MyFooBarTask extends ComputeTaskAdapter&lt;String, String&gt; {
  *     // Inject load balancer.
  *     &#64;LoadBalancerResource
  *     ComputeLoadBalancer balancer;
  *
  *     // Map jobs to grid nodes.
- *     public Map&lt;? extends ComputeJob, GridNode&gt; map(List&lt;GridNode&gt;
subgrid, String arg) throws IgniteCheckedException {
+ *     public Map&lt;? extends ComputeJob, GridNode&gt; map(List&lt;GridNode&gt;
subgrid, String arg)
+ *         throws IgniteCheckedException {
  *         Map&lt;MyFooBarJob, GridNode&gt; jobs = new HashMap&lt;MyFooBarJob,
GridNode&gt;(subgrid.size());
  *
  *         // In more complex cases, you can actually do
@@ -76,8 +77,8 @@ public abstract class ComputeTaskAdapter<T, R> implements ComputeTask<T,
R> {
      * <p>
      * If remote job resulted in exception ({@link ComputeJobResult#getException()} is not
{@code null}),
      * then {@link ComputeJobResultPolicy#FAILOVER} policy will be returned if the exception
is instance
-     * of {@link org.apache.ignite.cluster.ClusterTopologyException} or {@link ComputeExecutionRejectedException},
which means that
-     * remote node either failed or job execution was rejected before it got a chance to
start. In all
+     * of {@link org.apache.ignite.cluster.ClusterTopologyException} or {@link ComputeExecutionRejectedException},
+     * which means that remote node either failed or job execution was rejected before it
got a chance to start. In all
      * other cases the exception will be rethrown which will ultimately cause task to fail.
      *
      * @param res Received remote grid executable result.
@@ -87,7 +88,8 @@ public abstract class ComputeTaskAdapter<T, R> implements ComputeTask<T,
R> {
      * @throws IgniteException If handling a job result caused an error effectively rejecting
      *      a failover. This exception will be thrown out of {@link ComputeTaskFuture#get()}
method.
      */
-    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult>
rcvd) throws IgniteException {
+    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult>
rcvd)
+        throws IgniteException {
         IgniteException e = res.getException();
 
         // Try to failover if result is failed.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b7e7f239/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 3f4e97b..83b5ca5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -882,7 +882,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
     /** {@inheritDoc} */
     @Override public Set<Cache.Entry<K, V>> entrySet() {
-        return entrySet((CacheEntryPredicate[]) null);
+        return entrySet((CacheEntryPredicate[])null);
     }
 
     /** {@inheritDoc} */
@@ -897,17 +897,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
     /** {@inheritDoc} */
     @Override public Set<K> keySet() {
-        return keySet((CacheEntryPredicate[]) null);
+        return keySet((CacheEntryPredicate[])null);
     }
 
     /** {@inheritDoc} */
     @Override public Set<K> primaryKeySet() {
-        return primaryKeySet((CacheEntryPredicate[]) null);
+        return primaryKeySet((CacheEntryPredicate[])null);
     }
 
     /** {@inheritDoc} */
     @Override public Collection<V> values() {
-        return values((CacheEntryPredicate[]) null);
+        return values((CacheEntryPredicate[])null);
     }
 
     /**
@@ -2117,7 +2117,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
                 throws IgniteCheckedException {
                 Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys,
                     new C1<K, EntryProcessor<K, V, Object>>() {
-                            @Override public EntryProcessor apply(K k) {
+                        @Override public EntryProcessor apply(K k) {
                             return entryProcessor;
                         }
                     });
@@ -2145,7 +2145,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         IgniteInternalFuture<?> fut = asyncOp(new AsyncInOp() {
             @Override public IgniteInternalFuture<GridCacheReturn> inOp(IgniteTxLocalAdapter
tx) {
                 Map<? extends K, EntryProcessor<K, V, Object>> invokeMap =
-                    Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor);
+                    Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor);
 
                 return tx.invokeAsync(ctx, invokeMap, args);
             }
@@ -2371,7 +2371,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
             @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
                 return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray())
-                    .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>,
V>) RET2VAL);
+                    .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>,
V>)RET2VAL);
             }
 
             @Override public String toString() {
@@ -2526,7 +2526,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         return asyncOp(new AsyncOp<Boolean>() {
             @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter
tx) {
                 return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).chain(
-                    (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)
RET2FLAG);
+                    (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
             }
 
             @Override public String toString() {
@@ -2915,7 +2915,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
                 if (ctx.deploymentEnabled())
                     ctx.deploy().registerClass(oldVal);
 
-                return (GridCacheReturn) tx.putAllAsync(ctx,
+                return tx.putAllAsync(ctx,
                         F.t(key, newVal),
                         true,
                         null,
@@ -3017,7 +3017,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
                     ctx.deploy().registerClass(val);
 
                 return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false,
-                        ctx.equalsValArray(val)).get().success();
+                    ctx.equalsValArray(val)).get().success();
             }
 
             @Override public String toString() {
@@ -3230,10 +3230,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         TransactionConfiguration cfg = ctx.gridConfig().getTransactionConfiguration();
 
         return txStart(
-                concurrency,
-                isolation,
-                cfg.getDefaultTxTimeout(),
-                0
+            concurrency,
+            isolation,
+            cfg.getDefaultTxTimeout(),
+            0
         );
     }
 
@@ -3576,22 +3576,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         if (nodes.isEmpty())
             return new GridFinishedFuture<>(0);
 
-        IgniteInternalFuture<Collection<Integer>> fut =
-            ctx.closures().broadcastNoFailover(new SizeCallable(ctx.name(), peekModes), null,
nodes);
-
-        return fut.chain(new CX1<IgniteInternalFuture<Collection<Integer>>,
Integer>() {
-            @Override public Integer applyx(IgniteInternalFuture<Collection<Integer>>
fut)
-            throws IgniteCheckedException {
-                Collection<Integer> res = fut.get();
-
-                int totalSize = 0;
-
-                for (Integer size : res)
-                    totalSize += size;
-
-                return totalSize;
-            }
-        });
+        return new SizeFuture(peekModes, ctx, nodes);
     }
 
     /** {@inheritDoc} */
@@ -3675,7 +3660,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         return F.iterator(iterator(),
             new IgniteClosure<Cache.Entry<K, V>, Cache.Entry<K, V>>() {
                 private IgniteCacheExpiryPolicy expiryPlc =
-                        ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null);
+                    ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null);
 
                 @Override public Cache.Entry<K, V> apply(Cache.Entry<K, V> lazyEntry)
{
                     CacheOperationContext prev = ctx.gate().enter(opCtx);
@@ -3909,50 +3894,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
     }
 
     /**
-     * Gets cache global size (with or without backups).
-     *
-     * @param primaryOnly {@code True} if only primary sizes should be included.
-     * @return Global size.
-     * @throws IgniteCheckedException If internal task execution failed.
-     */
-    private int globalSize(boolean primaryOnly) throws IgniteCheckedException {
-        try {
-            // Send job to remote nodes only.
-            Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name()).forRemotes().nodes();
-
-            IgniteInternalFuture<Collection<Integer>> fut = null;
-
-            if (!nodes.isEmpty()) {
-                ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, gridCfg.getNetworkTimeout());
-
-                fut = ctx.closures().broadcastNoFailover(new GlobalSizeCallable(name(), primaryOnly),
null, nodes);
-            }
-
-            // Get local value.
-            int globalSize = primaryOnly ? primarySize() : size();
-
-            if (fut != null) {
-                for (Integer i : fut.get())
-                    globalSize += i;
-            }
-
-            return globalSize;
-        }
-        catch (ClusterGroupEmptyCheckedException ignore) {
-            if (log.isDebugEnabled())
-                log.debug("All remote nodes left while cache clearLocally [cacheName=" +
name() + "]");
-
-            return primaryOnly ? primarySize() : size();
-        }
-        catch (ComputeTaskTimeoutCheckedException e) {
-            U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider
increasing " +
-                "'networkTimeout' configuration property) [cacheName=" + name() + "]");
-
-            throw e;
-        }
-    }
-
-    /**
      * @param op Cache operation.
      * @param <T> Return type.
      * @return Operation result.
@@ -5047,9 +4988,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         @Override public Integer applyx(Object o) throws IgniteCheckedException {
             IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName);
 
-            assert cache != null : cacheName;
-
-            return cache.localSize(peekModes);
+            return cache == null ? 0 : cache.localSize(peekModes);
         }
 
         /** {@inheritDoc} */
@@ -5082,57 +5021,63 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
     }
 
     /**
-     * Internal callable which performs {@link IgniteInternalCache#size()} or {@link IgniteInternalCache#primarySize()}
-     * operation on a cache with the given name.
+     * Cache size future.
      */
-    @GridInternal
-    private static class GlobalSizeCallable implements IgniteClosure<Object, Integer>,
Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
+    private static class SizeFuture extends GridFutureAdapter<Integer> {
+        /** Peek modes. */
+        private final CachePeekMode[] peekModes;
 
-        /** Cache name. */
-        private String cacheName;
+        /** Context. */
+        private final GridCacheContext ctx;
 
-        /** Primary only flag. */
-        private boolean primaryOnly;
+        /** Nodes. */
+        private final Collection<ClusterNode> nodes;
 
-        /** Injected grid instance. */
-        @IgniteInstanceResource
-        private Ignite ignite;
+        /** Max retries count before issuing an error. */
+        private int retries = 32;
 
         /**
-         * Empty constructor for serialization.
+         * @param peekModes Peek modes.
          */
-        public GlobalSizeCallable() {
-            // No-op.
+        public SizeFuture(CachePeekMode[] peekModes, GridCacheContext ctx, Collection<ClusterNode>
nodes) {
+            this.peekModes = peekModes;
+            this.ctx = ctx;
+            this.nodes = nodes;
+
+            init();
         }
 
         /**
-         * @param cacheName Cache name.
-         * @param primaryOnly Primary only flag.
+         * Init.
          */
-        private GlobalSizeCallable(String cacheName, boolean primaryOnly) {
-            this.cacheName = cacheName;
-            this.primaryOnly = primaryOnly;
-        }
+        private void init() {
+            IgniteInternalFuture<Collection<Integer>> fut =
+                ctx.closures().broadcastNoFailover(new SizeCallable(ctx.name(), peekModes),
null, nodes);
 
-        /** {@inheritDoc} */
-        @Override public Integer apply(Object o) {
-            IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName);
+            fut.listen(new IgniteInClosure<IgniteInternalFuture<Collection<Integer>>>()
{
+                @Override public void apply(IgniteInternalFuture<Collection<Integer>>
fut) {
+                    try {
+                        Collection<Integer> res = fut.get();
 
-            return primaryOnly ? cache.primarySize() : cache.size();
-        }
+                        int size = 0;
 
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            U.writeString(out, cacheName);
-            out.writeBoolean(primaryOnly);
-        }
+                        for (Integer locSize : res)
+                            size += locSize;
 
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
-            cacheName = U.readString(in);
-            primaryOnly = in.readBoolean();
+                        onDone(size);
+                    }
+                    catch (IgniteCheckedException e) {
+                        if (X.hasCause(e, ClusterTopologyException.class, NullPointerException.class))
{
+                            if (retries-- > 0)
+                                init();
+                            else
+                                onDone(e);
+                        }
+                        else
+                            onDone(e);
+                    }
+                }
+            });
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b7e7f239/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index f6d686c..0f8b36b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -852,7 +852,8 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject
{
                 }
                 catch (IgniteException e) {
                     if (X.hasCause(e, GridInternalException.class) ||
-                        X.hasCause(e, IgfsOutOfSpaceException.class)) {
+                        X.hasCause(e, IgfsOutOfSpaceException.class) ||
+                        X.hasCause(e, ClusterTopologyException.class)) {
                         // Print internal exceptions only if debug is enabled.
                         if (log.isDebugEnabled())
                             U.error(log, "Failed to obtain remote job result policy for result
from " +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b7e7f239/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java
new file mode 100644
index 0000000..c04461b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class GridCacheSizeTopologyChangedTest extends GridCommonAbstractTest {
+    /** Ip finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Grids count */
+    private static final int GRIDS_CNT = 10;
+
+    /** Keys count */
+    private static final int KEYS_CNT = 1_000_000;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setAtomicityMode(ATOMIC);
+
+        ccfg.setCacheMode(PARTITIONED);
+
+        ccfg.setBackups(1);
+
+        cfg.setCacheConfiguration(defaultCacheConfiguration());
+
+        return cfg;
+    }
+
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception if failed
+     */
+    public void testCacheSize() throws Exception {
+        Ignite g0 = startGrid(0);
+
+        final AtomicBoolean canceled = new AtomicBoolean();
+
+        final Random rnd = new Random();
+
+        final boolean[] status = new boolean[GRIDS_CNT];
+
+        IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                while(!canceled.get()) {
+                    int idx = rnd.nextInt(GRIDS_CNT);
+
+                    if (idx > 0) {
+                        boolean state = status[idx];
+
+                        if (state) {
+                            System.out.println("!!! STOP GRID: " + idx);
+                            stopGrid(idx);
+                        }
+                        else {
+                            System.out.println("!!! START GRID:" + idx);
+
+                            startGrid(idx);
+                        }
+
+                        status[idx] = !state;
+
+                        U.sleep(1000);
+                    }
+                }
+                return null;
+            }
+        });
+
+        IgniteCache<Integer, Integer> cache = null;
+
+        try {
+            cache = g0.cache(null);
+
+            for (int i = 0; i < KEYS_CNT; i++) {
+                cache.put(i, 0);
+
+                int size = cache.size();
+
+                if (i % 1000 == 0)
+                    System.out.println("!!! Keys added: " + i + ", size: " + size);
+            }
+
+            canceled.set(true);
+        }
+        catch (Exception e) {
+            System.out.println("!!! Terminated abnormally");
+            e.printStackTrace();
+        }
+        finally {
+            canceled.set(true);
+
+            if (cache != null)
+                assertEquals(KEYS_CNT, cache.size());
+
+            fut.get();
+        }
+    }
+
+}


Mime
View raw message