ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [07/12] ignite git commit: IGNITE-2236: Optimized GridCompoundFuture: - Listener is "inlined" into the class; - Removed "ignoreChildFailures" field.
Date Thu, 21 Jan 2016 09:04:52 GMT
IGNITE-2236: Optimized GridCompoundFuture:
- Listener is "inlined" into the class;
- Removed "ignoreChildFailures" field.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1c302e40
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1c302e40
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1c302e40

Branch: refs/heads/ignite-gg-10837
Commit: 1c302e401b90928d48370757518e505383eb46bf
Parents: 27c9064
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Wed Jan 20 17:17:01 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Wed Jan 20 17:17:01 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMvccManager.java  |  43 ++-
 .../distributed/GridCacheTxRecoveryFuture.java  |   2 +-
 .../dht/CacheDistributedGetFutureAdapter.java   |   2 +-
 .../cache/distributed/dht/GridDhtGetFuture.java |   2 +-
 .../distributed/dht/GridDhtLockFuture.java      |   2 +-
 .../distributed/dht/GridDhtTxFinishFuture.java  |   2 +-
 .../colocated/GridDhtColocatedLockFuture.java   |   2 +-
 .../distributed/near/GridNearLockFuture.java    |   2 +-
 ...arOptimisticSerializableTxPrepareFuture.java |  47 ++-
 .../GridNearPessimisticTxPrepareFuture.java     |   6 +-
 .../near/GridNearTxFinishFuture.java            |  29 +-
 .../service/GridServiceProcessor.java           |  17 +-
 .../util/future/GridCompoundFuture.java         | 314 +++++--------------
 .../util/future/GridCompoundIdentityFuture.java |   6 +-
 .../internal/util/future/GridFutureAdapter.java |   2 +
 .../ignite/testframework/GridTestUtils.java     |   7 +-
 16 files changed, 195 insertions(+), 290 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index dbc6992..c7d1f62 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -17,17 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -63,6 +52,18 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
@@ -982,9 +983,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
      */
     @SuppressWarnings("unchecked")
     public IgniteInternalFuture<?> finishAtomicUpdates(AffinityTopologyVersion topVer)
{
-        GridCompoundFuture<Object, Object> res = new GridCompoundFuture<>();
-
-        res.ignoreChildFailures(ClusterTopologyCheckedException.class, CachePartialUpdateCheckedException.class);
+        GridCompoundFuture<Object, Object> res = new FinishAtomicUpdateFuture();
 
         for (GridCacheAtomicFuture<?> fut : atomicFuts.values()) {
             IgniteInternalFuture<Void> complete = fut.completeFuture(topVer);
@@ -1221,4 +1220,20 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
                 return S.toString(FinishLockFuture.class, this, super.toString());
         }
     }
+
+    /**
+     * Finish atomic update future.
+     */
+    private static class FinishAtomicUpdateFuture extends GridCompoundFuture<Object, Object>
{
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override protected boolean ignoreFailure(Throwable err) {
+            Class cls = err.getClass();
+
+            return ClusterTopologyCheckedException.class.isAssignableFrom(cls) ||
+                CachePartialUpdateCheckedException.class.isAssignableFrom(cls);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index 1648de0..5a4a1ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -90,7 +90,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
          UUID failedNodeId,
         Map<UUID, Collection<UUID>> txNodes)
     {
-        super(cctx.kernalContext(), CU.boolReducer());
+        super(CU.boolReducer());
 
         this.cctx = cctx;
         this.tx = tx;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index 40eec63..7efaf49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -132,7 +132,7 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends
GridCompoun
         boolean needVer,
         boolean keepCacheObjects
     ) {
-        super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
+        super(CU.<K, V>mapsReducer(keys.size()));
 
         assert !F.isEmpty(keys);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index e410228..cb8c842 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -137,7 +137,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
         boolean skipVals
     ) {
-        super(cctx.kernalContext(), CU.<GridCacheEntryInfo>collectionsReducer());
+        super(CU.<GridCacheEntryInfo>collectionsReducer());
 
         assert reader != null;
         assert !F.isEmpty(keys);

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 1c3e052..07755e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -194,7 +194,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
         CacheEntryPredicate[] filter,
         boolean skipStore,
         boolean keepBinary) {
-        super(cctx.kernalContext(), CU.boolReducer());
+        super(CU.boolReducer());
 
         assert nearNodeId != null;
         assert nearLockVer != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 0e5db05..8c295ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -99,7 +99,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
      * @param commit Commit flag.
      */
     public GridDhtTxFinishFuture(GridCacheSharedContext<K, V> cctx, GridDhtTxLocalAdapter
tx, boolean commit) {
-        super(cctx.kernalContext(), F.<IgniteInternalTx>identityReducer(tx));
+        super(F.<IgniteInternalTx>identityReducer(tx));
 
         this.cctx = cctx;
         this.tx = tx;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index cfeee4b..e4c6b71 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -169,7 +169,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
         CacheEntryPredicate[] filter,
         boolean skipStore,
         boolean keepBinary) {
-        super(cctx.kernalContext(), CU.boolReducer());
+        super(CU.boolReducer());
 
         assert keys != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 55c5ab6..5d4fc01 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -177,7 +177,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
         CacheEntryPredicate[] filter,
         boolean skipStore,
         boolean keepBinary) {
-        super(cctx.kernalContext(), CU.boolReducer());
+        super(CU.boolReducer());
 
         assert keys != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 2090e04..4f9f227 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -90,9 +90,11 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
         super(cctx, tx);
 
         assert tx.optimistic() && tx.serializable() : tx;
+    }
 
-        // Should wait for all mini futures completion before finishing tx.
-        ignoreChildFailures(IgniteCheckedException.class);
+    /** {@inheritDoc} */
+    @Override protected boolean ignoreFailure(Throwable err) {
+        return IgniteCheckedException.class.isAssignableFrom(err.getClass());
     }
 
     /** {@inheritDoc} */
@@ -629,32 +631,43 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
     }
 
     /**
-     *
+     * Client remap future.
      */
     private static class ClientRemapFuture extends GridCompoundFuture<GridNearTxPrepareResponse,
Boolean> {
         /** */
-        private boolean remap = true;
+        private static final long serialVersionUID = 0L;
 
         /**
-         *
+         * Constructor.
          */
         public ClientRemapFuture() {
-            super();
+            super(new ClientRemapFutureReducer());
+        }
+    }
 
-            reducer(new IgniteReducer<GridNearTxPrepareResponse, Boolean>() {
-                @Override public boolean collect(GridNearTxPrepareResponse res) {
-                    assert res != null;
+    /**
+     * Client remap future reducer.
+     */
+    private static class ClientRemapFutureReducer implements IgniteReducer<GridNearTxPrepareResponse,
Boolean> {
+        /** */
+        private static final long serialVersionUID = 0L;
 
-                    if (res.clientRemapVersion() == null)
-                        remap = false;
+        /** Remap flag. */
+        private boolean remap = true;
 
-                    return true;
-                }
+        /** {@inheritDoc} */
+        @Override public boolean collect(@Nullable GridNearTxPrepareResponse res) {
+            assert res != null;
 
-                @Override public Boolean reduce() {
-                    return remap;
-                }
-            });
+            if (res.clientRemapVersion() == null)
+                remap = false;
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Boolean reduce() {
+            return remap;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 9ee9aea..8170008 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -63,9 +63,11 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
         super(cctx, tx);
 
         assert tx.pessimistic() : tx;
+    }
 
-        // Should wait for all mini futures completion before finishing tx.
-        ignoreChildFailures(IgniteCheckedException.class);
+    /** {@inheritDoc} */
+    @Override protected boolean ignoreFailure(Throwable err) {
+        return IgniteCheckedException.class.isAssignableFrom(err.getClass());
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 26e189b..3c33bc4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -28,7 +28,6 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -107,7 +106,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
      * @param commit Commit flag.
      */
     public GridNearTxFinishFuture(GridCacheSharedContext<K, V> cctx, GridNearTxLocal
tx, boolean commit) {
-        super(cctx.kernalContext(), F.<IgniteInternalTx>identityReducer(tx));
+        super(F.<IgniteInternalTx>identityReducer(tx));
 
         this.cctx = cctx;
         this.tx = tx;
@@ -644,16 +643,30 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 if (f.getClass() == FinishMiniFuture.class) {
                     FinishMiniFuture fut = (FinishMiniFuture)f;
 
-                    return "FinishFuture[node=" + fut.node().id() +
-                        ", loc=" + fut.node().isLocal() +
-                        ", done=" + fut.isDone() + "]";
+                    ClusterNode node = fut.node();
+
+                    if (node != null) {
+                        return "FinishFuture[node=" + node.id() +
+                            ", loc=" + node.isLocal() +
+                            ", done=" + fut.isDone() + ']';
+                    }
+                    else {
+                        return "FinishFuture[node=null, done=" + fut.isDone() + ']';
+                    }
                 }
                 else if (f.getClass() == CheckBackupMiniFuture.class) {
                     CheckBackupMiniFuture fut = (CheckBackupMiniFuture)f;
 
-                    return "CheckBackupFuture[node=" + fut.node().id() +
-                        ", loc=" + fut.node().isLocal() +
-                        ", done=" + f.isDone() + "]";
+                    ClusterNode node = fut.node();
+
+                    if (node != null) {
+                        return "CheckBackupFuture[node=" + node.id() +
+                            ", loc=" + node.isLocal() +
+                            ", done=" + f.isDone() + "]";
+                    }
+                    else {
+                        return "CheckBackupFuture[node=null, done=" + f.isDone() + "]";
+                    }
                 }
                 else if (f.getClass() == CheckRemoteTxMiniFuture.class) {
                     CheckRemoteTxMiniFuture fut = (CheckRemoteTxMiniFuture)f;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 6b05edd..2841083 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -515,10 +515,10 @@ public class GridServiceProcessor extends GridProcessorAdapter {
      */
     @SuppressWarnings("unchecked")
     public IgniteInternalFuture<?> cancelAll() {
-        Collection<IgniteInternalFuture<?>> futs = new ArrayList<>();
-
         Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceDeploymentPredicate.INSTANCE);
 
+        GridCompoundFuture res = null;
+
         while (it.hasNext()) {
             Cache.Entry<Object, Object> e = it.next();
 
@@ -527,11 +527,20 @@ public class GridServiceProcessor extends GridProcessorAdapter {
 
             GridServiceDeployment dep = (GridServiceDeployment)e.getValue();
 
+            if (res == null)
+                res = new GridCompoundFuture<>();
+
             // Cancel each service separately.
-            futs.add(cancel(dep.configuration().getName()));
+            res.add(cancel(dep.configuration().getName()));
         }
 
-        return futs.isEmpty() ? new GridFinishedFuture<>() : new GridCompoundFuture(null,
futs);
+        if (res != null) {
+            res.markInitialized();
+
+            return res;
+        }
+        else
+            return new GridFinishedFuture<>();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index 4b2461e..c382497 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -17,15 +17,11 @@
 
 package org.apache.ignite.internal.util.future;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
@@ -35,55 +31,48 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteReducer;
 import org.jetbrains.annotations.Nullable;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
 /**
  * Future composed of multiple inner futures.
  */
-public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
+public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements
IgniteInClosure<IgniteInternalFuture<T>> {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** */
-    private static final int INITED = 0b1;
+    /** Initialization flag. */
+    private static final int INIT_FLAG = 0x1;
 
-    /** */
-    private static final AtomicIntegerFieldUpdater<GridCompoundFuture> flagsUpd =
-        AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "flags");
+    /** Flags updater. */
+    private static final AtomicIntegerFieldUpdater<GridCompoundFuture> FLAGS_UPD =
+        AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "initFlag");
 
-    /** */
-    private static final AtomicIntegerFieldUpdater<GridCompoundFuture> lsnrCallsUpd
=
+    /** Listener calls updater. */
+    private static final AtomicIntegerFieldUpdater<GridCompoundFuture> LSNR_CALLS_UPD
=
         AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "lsnrCalls");
 
     /** Futures. */
     protected final ArrayList<IgniteInternalFuture<T>> futs = new ArrayList<>();
 
-    /** */
-    @GridToStringExclude
-    private final Listener lsnr = new Listener();
-
     /** Reducer. */
     @GridToStringInclude
-    private IgniteReducer<T, R> rdc;
-
-    /** Exceptions to ignore. */
-    private Class<? extends Throwable>[] ignoreChildFailures;
+    private final IgniteReducer<T, R> rdc;
 
-    /**
-     * Updated via {@link #flagsUpd}.
-     *
-     * @see #INITED
-     */
+    /** Initialization flag. Updated via {@link #FLAGS_UPD}. */
     @SuppressWarnings("unused")
-    private volatile int flags;
+    private volatile int initFlag;
 
-    /** Updated via {@link #lsnrCallsUpd}. */
+    /** Listener calls. Updated via {@link #LSNR_CALLS_UPD}. */
     @SuppressWarnings("unused")
     private volatile int lsnrCalls;
 
     /**
-     *
+     * Default constructor.
      */
     public GridCompoundFuture() {
-        // No-op.
+        this(null);
     }
 
     /**
@@ -93,19 +82,59 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R>
{
         this.rdc = rdc;
     }
 
-    /**
-     * @param rdc Reducer to add.
-     * @param futs Futures to add.
-     */
-    public GridCompoundFuture(
-        @Nullable IgniteReducer<T, R> rdc,
-        @Nullable Iterable<IgniteInternalFuture<T>> futs
-    ) {
-        this.rdc = rdc;
+    /** {@inheritDoc} */
+    @Override public void apply(IgniteInternalFuture<T> fut) {
+        try {
+            T t = fut.get();
+
+            try {
+                if (rdc != null && !rdc.collect(t))
+                    onDone(rdc.reduce());
+            }
+            catch (RuntimeException e) {
+                U.error(null, "Failed to execute compound future reducer: " + this, e);
 
-        addAll(futs);
+                // Exception in reducer is a bug, so we bypass checkComplete here.
+                onDone(e);
+            }
+            catch (AssertionError e) {
+                U.error(null, "Failed to execute compound future reducer: " + this, e);
 
-        markInitialized();
+                // Bypass checkComplete because need to rethrow.
+                onDone(e);
+
+                throw e;
+            }
+        }
+        catch (IgniteTxOptimisticCheckedException | IgniteFutureCancelledCheckedException
|
+            ClusterTopologyCheckedException e) {
+            if (!ignoreFailure(e))
+                onDone(e);
+        }
+        catch (IgniteCheckedException e) {
+            if (!ignoreFailure(e)) {
+                U.error(null, "Failed to execute compound future reducer: " + this, e);
+
+                onDone(e);
+            }
+        }
+        catch (RuntimeException e) {
+            U.error(null, "Failed to execute compound future reducer: " + this, e);
+
+            onDone(e);
+        }
+        catch (AssertionError e) {
+            U.error(null, "Failed to execute compound future reducer: " + this, e);
+
+            // Bypass checkComplete because need to rethrow.
+            onDone(e);
+
+            throw e;
+        }
+
+        LSNR_CALLS_UPD.incrementAndGet(GridCompoundFuture.this);
+
+        checkComplete();
     }
 
     /** {@inheritDoc} */
@@ -125,43 +154,20 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R>
{
      *
      * @return Collection of futures.
      */
-    private Collection<IgniteInternalFuture<T>> futures(boolean pending) {
+    public Collection<IgniteInternalFuture<T>> futures() {
         synchronized (futs) {
-            Collection<IgniteInternalFuture<T>> res = new ArrayList<>(futs.size());
-
-            for (IgniteInternalFuture<T> fut : futs) {
-                if (!pending || !fut.isDone())
-                    res.add(fut);
-            }
-
-            return res;
+            return new ArrayList<>(futs);
         }
     }
 
     /**
-     * Gets collection of futures.
-     *
-     * @return Collection of futures.
-     */
-    public Collection<IgniteInternalFuture<T>> futures() {
-        return futures(false);
-    }
-
-    /**
-     * Gets pending (unfinished) futures.
+     * Checks if this compound future should ignore this particular exception.
      *
-     * @return Pending futures.
-     */
-    public Collection<IgniteInternalFuture<T>> pending() {
-        return futures(true);
-    }
-
-    /**
-     * @param ignoreChildFailures Flag indicating whether compound future should ignore child
futures failures.
+     * @param err Exception to check.
+     * @return {@code True} if this error should be ignored.
      */
-    @SafeVarargs
-    public final void ignoreChildFailures(Class<? extends Throwable>... ignoreChildFailures)
{
-        this.ignoreChildFailures = ignoreChildFailures;
+    protected boolean ignoreFailure(Throwable err) {
+        return false;
     }
 
     /**
@@ -187,14 +193,6 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R>
{
     }
 
     /**
-     * @return {@code True} if this future was initialized. Initialization happens when
-     *      {@link #markInitialized()} method is called on future.
-     */
-    public boolean initialized() {
-        return flagSet(INITED);
-    }
-
-    /**
      * Adds a future to this compound future.
      *
      * @param fut Future to add.
@@ -206,7 +204,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R>
{
             futs.add(fut);
         }
 
-        fut.listen(lsnr);
+        fut.listen(this);
 
         if (isCancelled()) {
             try {
@@ -219,76 +217,18 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R>
{
     }
 
     /**
-     * Adds futures to this compound future.
-     *
-     * @param futs Futures to add.
-     */
-    @SafeVarargs
-    public final void addAll(@Nullable IgniteInternalFuture<T>... futs) {
-        addAll(F.asList(futs));
-    }
-
-    /**
-     * Adds futures to this compound future.
-     *
-     * @param futs Futures to add.
-     */
-    public void addAll(@Nullable Iterable<IgniteInternalFuture<T>> futs) {
-        if (futs != null) {
-            for (IgniteInternalFuture<T> fut : futs)
-                add(fut);
-        }
-    }
-
-    /**
-     * Gets optional reducer.
-     *
-     * @return Optional reducer.
-     */
-    @Nullable public IgniteReducer<T, R> reducer() {
-        return rdc;
-    }
-
-    /**
-     * Sets optional reducer.
-     *
-     * @param rdc Optional reducer.
-     */
-    public void reducer(@Nullable IgniteReducer<T, R> rdc) {
-        this.rdc = rdc;
-    }
-
-    /**
-     * @param flag Flag to CAS.
-     * @return {@code True} if CAS succeeds.
-     */
-    private boolean casFlag(int flag) {
-        for (;;) {
-            int flags0 = flags;
-
-            if ((flags0 & flag) != 0)
-                return false;
-
-            if (flagsUpd.compareAndSet(this, flags0, flags0 | flag))
-                return true;
-        }
-    }
-
-    /**
-     * @param flag Flag to check.
-     * @return {@code True} if set.
+     * @return {@code True} if this future was initialized. Initialization happens when
+     *      {@link #markInitialized()} method is called on future.
      */
-    private boolean flagSet(int flag) {
-        return (flags & flag) != 0;
+    public boolean initialized() {
+        return initFlag == INIT_FLAG;
     }
 
     /**
      * Mark this future as initialized.
      */
     public void markInitialized() {
-        if (casFlag(INITED))
-            // Check complete to make sure that we take care
-            // of all the ignored callbacks.
+        if (FLAGS_UPD.compareAndSet(this, 0, INIT_FLAG))
             checkComplete();
     }
 
@@ -296,7 +236,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R>
{
      * Check completeness of the future.
      */
     private void checkComplete() {
-        if (flagSet(INITED) && !isDone() && lsnrCalls == futuresSize()) {
+        if (initialized() && !isDone() && lsnrCalls == futuresSize()) {
             try {
                 onDone(rdc != null ? rdc.reduce() : null);
             }
@@ -324,26 +264,6 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R>
{
         }
     }
 
-    /**
-     * Checks if this compound future should ignore this particular exception.
-     *
-     * @param err Exception to check.
-     * @return {@code True} if this error should be ignored.
-     */
-    private boolean ignoreFailure(@Nullable Throwable err) {
-        if (err == null)
-            return true;
-
-        if (ignoreChildFailures != null) {
-            for (Class<? extends Throwable> ignoreCls : ignoreChildFailures) {
-                if (ignoreCls.isAssignableFrom(err.getClass()))
-                    return true;
-            }
-        }
-
-        return false;
-    }
-
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCompoundFuture.class, this,
@@ -358,72 +278,4 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R>
{
                 })
         );
     }
-
-    /**
-     * Listener for futures.
-     */
-    private class Listener implements IgniteInClosure<IgniteInternalFuture<T>>
{
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public void apply(IgniteInternalFuture<T> fut) {
-            try {
-                T t = fut.get();
-
-                try {
-                    if (rdc != null && !rdc.collect(t))
-                        onDone(rdc.reduce());
-                }
-                catch (RuntimeException e) {
-                    U.error(null, "Failed to execute compound future reducer: " + this, e);
-
-                    // Exception in reducer is a bug, so we bypass checkComplete here.
-                    onDone(e);
-                }
-                catch (AssertionError e) {
-                    U.error(null, "Failed to execute compound future reducer: " + this, e);
-
-                    // Bypass checkComplete because need to rethrow.
-                    onDone(e);
-
-                    throw e;
-                }
-            }
-            catch (IgniteTxOptimisticCheckedException | IgniteFutureCancelledCheckedException
|
-                ClusterTopologyCheckedException e) {
-                if (!ignoreFailure(e))
-                    onDone(e);
-            }
-            catch (IgniteCheckedException e) {
-                if (!ignoreFailure(e)) {
-                    U.error(null, "Failed to execute compound future reducer: " + this, e);
-
-                    onDone(e);
-                }
-            }
-            catch (RuntimeException e) {
-                U.error(null, "Failed to execute compound future reducer: " + this, e);
-
-                onDone(e);
-            }
-            catch (AssertionError e) {
-                U.error(null, "Failed to execute compound future reducer: " + this, e);
-
-                // Bypass checkComplete because need to rethrow.
-                onDone(e);
-
-                throw e;
-            }
-
-            lsnrCallsUpd.incrementAndGet(GridCompoundFuture.this);
-
-            checkComplete();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return "Compound future listener []";
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundIdentityFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundIdentityFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundIdentityFuture.java
index bb5abf2..4010ccd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundIdentityFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundIdentityFuture.java
@@ -17,13 +17,12 @@
 
 package org.apache.ignite.internal.util.future;
 
-import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteReducer;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Future composed of multiple inner futures.
+ * Compound future with reducer which accepts and produces results of the same type.
  */
 public class GridCompoundIdentityFuture<T> extends GridCompoundFuture<T, T> {
     /** */
@@ -37,10 +36,9 @@ public class GridCompoundIdentityFuture<T> extends GridCompoundFuture<T,
T> {
     }
 
     /**
-     * @param ctx Context.
      * @param rdc Reducer.
      */
-    public GridCompoundIdentityFuture(GridKernalContext ctx, @Nullable IgniteReducer<T,
T> rdc) {
+    public GridCompoundIdentityFuture(@Nullable IgniteReducer<T, T> rdc) {
         super(rdc);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index a1720d5..c6a6a44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -72,6 +73,7 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer
implements
     private boolean ignoreInterrupts;
 
     /** */
+    @GridToStringExclude
     private IgniteInClosure<? super IgniteInternalFuture<R>> lsnr;
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index 26a8994..c7940c6 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -648,10 +648,11 @@ public final class GridTestUtils {
         });
 
         // Compound future, that adds cancel() support to execution future.
-        GridCompoundFuture<Long, Long> compFut = new GridCompoundFuture<>();
+        GridCompoundFuture<Long, Long> compFut = new GridCompoundFuture<>(F.sumLongReducer());
+
+        compFut.add(cancelFut);
+        compFut.add(runFut);
 
-        compFut.addAll(cancelFut, runFut);
-        compFut.reducer(F.sumLongReducer());
         compFut.markInitialized();
 
         cancelFut.onDone();


Mime
View raw message