ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [3/3] ignite git commit: IGNITE-2265: WIP (3)
Date Sat, 26 Dec 2015 10:30:47 GMT
IGNITE-2265: WIP (3)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ecdba1fe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ecdba1fe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ecdba1fe

Branch: refs/heads/ignite-2265
Commit: ecdba1fe70d2046b1286b74d5dd7a06c525740aa
Parents: 47d2fa3
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Sat Dec 26 13:29:09 2015 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Sat Dec 26 13:29:09 2015 +0300

----------------------------------------------------------------------
 ...arOptimisticSerializableTxPrepareFuture.java | 88 +++++++++++---------
 .../near/GridNearOptimisticTxPrepareFuture.java | 48 +++++++----
 2 files changed, 79 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ecdba1fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 50c5263..2090e04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -23,7 +23,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterTopologyException;
@@ -366,7 +367,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
         for (GridDistributedTxMapping m : mappings.values()) {
             assert !m.empty();
 
-            add(new MiniFuture(m));
+            add(new MiniFuture(this, m));
         }
 
         Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
@@ -410,7 +411,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
      * @return {@code True} if skip future during remap.
      */
     private boolean skipFuture(boolean remap, IgniteInternalFuture<?> fut) {
-        return !(isMini(fut)) || (remap && ((MiniFuture)fut).rcvRes.get());
+        return !(isMini(fut)) || (remap && (((MiniFuture)fut).rcvRes == 1));
     }
 
     /**
@@ -630,7 +631,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
     /**
      *
      */
-    private class ClientRemapFuture extends GridCompoundFuture<GridNearTxPrepareResponse,
Boolean> {
+    private static class ClientRemapFuture extends GridCompoundFuture<GridNearTxPrepareResponse,
Boolean> {
         /** */
         private boolean remap = true;
 
@@ -660,24 +661,34 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
     /**
      *
      */
-    private class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
+    private static class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse>
{
         /** */
         private static final long serialVersionUID = 0L;
 
+        /** Receive result flag updater. */
+        private static AtomicIntegerFieldUpdater<MiniFuture> RCV_RES_UPD =
+            AtomicIntegerFieldUpdater.newUpdater(MiniFuture.class, "rcvRes");
+
         /** */
         private final IgniteUuid futId = IgniteUuid.randomUuid();
 
+        /** Parent future. */
+        private final GridNearOptimisticSerializableTxPrepareFuture parent;
+
         /** Keys. */
         @GridToStringInclude
         private GridDistributedTxMapping m;
 
         /** Flag to signal some result being processed. */
-        private AtomicBoolean rcvRes = new AtomicBoolean(false);
+        @SuppressWarnings("UnusedDeclaration")
+        private volatile int rcvRes;
 
         /**
+         * @param parent Parent future.
          * @param m Mapping.
          */
-        MiniFuture(GridDistributedTxMapping m) {
+        MiniFuture(GridNearOptimisticSerializableTxPrepareFuture parent, GridDistributedTxMapping
m) {
+            this.parent = parent;
             this.m = m;
         }
 
@@ -706,8 +717,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
          * @param e Error.
          */
         void onResult(Throwable e) {
-            if (rcvRes.compareAndSet(false, true)) {
-                onError(m, e);
+            if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
+                parent.onError(m, e);
 
                 if (log.isDebugEnabled())
                     log.debug("Failed to get future result [fut=" + this + ", err=" + e +
']');
@@ -717,7 +728,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             }
             else
                 U.warn(log, "Received error after another result has been processed [fut="
+
-                    GridNearOptimisticSerializableTxPrepareFuture.this + ", mini=" + this
+ ']', e);
+                    parent + ", mini=" + this + ']', e);
         }
 
         /**
@@ -727,11 +738,11 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             if (isDone())
                 return;
 
-            if (rcvRes.compareAndSet(false, true)) {
+            if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
                 if (log.isDebugEnabled())
                     log.debug("Remote node left grid while sending or waiting for reply (will
not retry): " + this);
 
-                onError(null, e);
+                parent.onError(null, e);
 
                 onDone(e);
             }
@@ -745,35 +756,35 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             if (isDone())
                 return;
 
-            if (rcvRes.compareAndSet(false, true)) {
+            if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
                 if (res.error() != null) {
                     // Fail the whole compound future.
-                    onError(m, res.error());
+                    parent.onError(m, res.error());
 
                     onDone(res.error());
                 }
                 else {
                     if (res.clientRemapVersion() != null) {
-                        assert cctx.kernalContext().clientNode();
+                        assert parent.cctx.kernalContext().clientNode();
                         assert m.clientFirst();
 
-                        tx.removeMapping(m.node().id());
+                        parent.tx.removeMapping(m.node().id());
 
                         ClientRemapFuture remapFut0 = null;
 
-                        synchronized (GridNearOptimisticSerializableTxPrepareFuture.this)
{
-                            if (remapFut == null) {
-                                remapFut = new ClientRemapFuture();
+                        synchronized (parent) {
+                            if (parent.remapFut == null) {
+                                parent.remapFut = new ClientRemapFuture();
 
-                                remapFut0 = remapFut;
+                                remapFut0 = parent.remapFut;
                             }
                         }
 
                         if (remapFut0 != null) {
-                            Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
+                            Collection<IgniteInternalFuture<?>> futs = (Collection)parent.futures();
 
                             for (IgniteInternalFuture<?> fut : futs) {
-                                if (isMini(fut) && fut != this)
+                                if (parent.isMini(fut) && fut != this)
                                     remapFut0.add((MiniFuture)fut);
                             }
 
@@ -783,22 +794,22 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                                 @Override public void apply(IgniteInternalFuture<Boolean>
remapFut0) {
                                     try {
                                         IgniteInternalFuture<?> affFut =
-                                            cctx.exchange().affinityReadyFuture(res.clientRemapVersion());
+                                            parent.cctx.exchange().affinityReadyFuture(res.clientRemapVersion());
 
                                         if (affFut == null)
                                             affFut = new GridFinishedFuture<Object>();
 
-                                        if (remapFut.get()) {
+                                        if (parent.remapFut.get()) {
                                             if (log.isDebugEnabled()) {
                                                 log.debug("Will remap client tx [" +
-                                                    "fut=" + GridNearOptimisticSerializableTxPrepareFuture.this
+
+                                                    "fut=" + parent +
                                                     ", topVer=" + res.topologyVersion() +
']');
                                             }
 
-                                            synchronized (GridNearOptimisticSerializableTxPrepareFuture.this)
{
-                                                assert remapFut0 == remapFut;
+                                            synchronized (parent) {
+                                                assert remapFut0 == parent.remapFut;
 
-                                                remapFut = null;
+                                                parent.remapFut = null;
                                             }
 
                                             affFut.listen(new CI1<IgniteInternalFuture<?>>()
{
@@ -809,9 +820,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                                                         remap(res);
                                                     }
                                                     catch (IgniteCheckedException e) {
-                                                        ERR_UPD.compareAndSet(
-                                                            GridNearOptimisticSerializableTxPrepareFuture.this,
-                                                            null, e);
+                                                        ERR_UPD.compareAndSet(parent, null,
e);
 
                                                         onDone(e);
                                                     }
@@ -824,8 +833,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
                                             err0.retryReadyFuture(affFut);
 
-                                            ERR_UPD.compareAndSet(GridNearOptimisticSerializableTxPrepareFuture.this,
-                                                null, err0);
+                                            ERR_UPD.compareAndSet(parent, null, err0);
 
                                             onDone(err0);
                                         }
@@ -833,11 +841,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                                     catch (IgniteCheckedException e) {
                                         if (log.isDebugEnabled()) {
                                             log.debug("Prepare failed, will not remap tx:
" +
-                                                GridNearOptimisticSerializableTxPrepareFuture.this);
+                                                parent);
                                         }
 
-                                        ERR_UPD.compareAndSet(GridNearOptimisticSerializableTxPrepareFuture.this,
-                                            null, e);
+                                        ERR_UPD.compareAndSet(parent, null, e);
 
                                         onDone(e);
                                     }
@@ -848,10 +855,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                             onDone(res);
                     }
                     else {
-                        onPrepareResponse(m, res);
+                        parent.onPrepareResponse(m, res);
 
                         // Finish this mini future (need result only on client node).
-                        onDone(cctx.kernalContext().clientNode() ? res : null);
+                        onDone(parent.cctx.kernalContext().clientNode() ? res : null);
                     }
                 }
             }
@@ -861,8 +868,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
          * @param res Response.
          */
         private void remap(final GridNearTxPrepareResponse res) {
-            prepareOnTopology(true, new Runnable() {
-                @Override public void run() {
+            parent.prepareOnTopology(true, new Runnable() {
+                @Override
+                public void run() {
                     onDone(res);
                 }
             });

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecdba1fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index de19c95..bae0327 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -24,7 +24,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterTopologyException;
@@ -448,7 +449,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                 }
             }
 
-            final MiniFuture fut = new MiniFuture(m, mappings);
+            final MiniFuture fut = new MiniFuture(this, m, mappings);
 
             req.miniId(fut.futureId());
 
@@ -611,10 +612,17 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
     /**
      *
      */
-    private class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
+    private static class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse>
{
         /** */
         private static final long serialVersionUID = 0L;
 
+        /** Receive result flag updater. */
+        private static AtomicIntegerFieldUpdater<MiniFuture> RCV_RES_UPD =
+            AtomicIntegerFieldUpdater.newUpdater(MiniFuture.class, "rcvRes");
+
+        /** Parent future. */
+        private final GridNearOptimisticTxPrepareFuture parent;
+
         /** */
         private final IgniteUuid futId = IgniteUuid.randomUuid();
 
@@ -623,16 +631,20 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
         private GridDistributedTxMapping m;
 
         /** Flag to signal some result being processed. */
-        private AtomicBoolean rcvRes = new AtomicBoolean(false);
+        @SuppressWarnings("UnusedDeclaration")
+        private volatile int rcvRes;
 
         /** Mappings to proceed prepare. */
         private Queue<GridDistributedTxMapping> mappings;
 
         /**
+         * @param parent Parent.
          * @param m Mapping.
          * @param mappings Queue of mappings to proceed with.
          */
-        MiniFuture(GridDistributedTxMapping m, Queue<GridDistributedTxMapping> mappings)
{
+        MiniFuture(GridNearOptimisticTxPrepareFuture parent, GridDistributedTxMapping m,
+            Queue<GridDistributedTxMapping> mappings) {
+            this.parent = parent;
             this.m = m;
             this.mappings = mappings;
         }
@@ -662,7 +674,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
          * @param e Error.
          */
         void onResult(Throwable e) {
-            if (rcvRes.compareAndSet(false, true)) {
+            if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
                 if (log.isDebugEnabled())
                     log.debug("Failed to get future result [fut=" + this + ", err=" + e +
']');
 
@@ -671,7 +683,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
             }
             else
                 U.warn(log, "Received error after another result has been processed [fut="
+
-                    GridNearOptimisticTxPrepareFuture.this + ", mini=" + this + ']', e);
+                    parent + ", mini=" + this + ']', e);
         }
 
         /**
@@ -681,13 +693,13 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
             if (isDone())
                 return;
 
-            if (rcvRes.compareAndSet(false, true)) {
+            if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
                 if (log.isDebugEnabled())
                     log.debug("Remote node left grid while sending or waiting for reply (will
not retry): " + this);
 
                 // Fail the whole future (make sure not to remap on different primary node
                 // to prevent multiple lock coordinators).
-                onError(e);
+                parent.onError(e);
             }
         }
 
@@ -695,21 +707,23 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
          * @param nodeId Failed node ID.
          * @param res Result callback.
          */
+        @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
         void onResult(UUID nodeId, final GridNearTxPrepareResponse res) {
             if (isDone())
                 return;
 
-            if (rcvRes.compareAndSet(false, true)) {
+            if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
                 if (res.error() != null) {
                     // Fail the whole compound future.
-                    onError(res.error());
+                    parent.onError(res.error());
                 }
                 else {
                     if (res.clientRemapVersion() != null) {
-                        assert cctx.kernalContext().clientNode();
+                        assert parent.cctx.kernalContext().clientNode();
                         assert m.clientFirst();
 
-                        IgniteInternalFuture<?> affFut = cctx.exchange().affinityReadyFuture(res.clientRemapVersion());
+                        IgniteInternalFuture<?> affFut =
+                            parent.cctx.exchange().affinityReadyFuture(res.clientRemapVersion());
 
                         if (affFut != null && !affFut.isDone()) {
                             affFut.listen(new CI1<IgniteInternalFuture<?>>()
{
@@ -728,11 +742,11 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                         else
                             remap();
                     } else {
-                        onPrepareResponse(m, res);
+                        parent.onPrepareResponse(m, res);
 
                         // Proceed prepare before finishing mini future.
                         if (mappings != null)
-                            proceedPrepare(mappings);
+                            parent.proceedPrepare(mappings);
 
                         // Finish this mini future.
                         onDone((GridNearTxPrepareResponse)null);
@@ -745,10 +759,10 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
          *
          */
         private void remap() {
-            prepareOnTopology(true, new Runnable() {
+            parent.prepareOnTopology(true, new Runnable() {
                 @Override
                 public void run() {
-                    onDone((GridNearTxPrepareResponse)null);
+                    onDone((GridNearTxPrepareResponse) null);
                 }
             });
         }


Mime
View raw message