ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [44/50] ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-5578-locJoin' into ignite-5578
Date Thu, 13 Jul 2017 14:36:13 GMT
Merge remote-tracking branch 'remotes/origin/ignite-5578-locJoin' into ignite-5578

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/39835f6a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/39835f6a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/39835f6a

Branch: refs/heads/ignite-5578
Commit: 39835f6a2552d91e68864511f44d0a3e3b9973b4
Parents: 7970ff7
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Jul 13 13:46:23 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Jul 13 13:46:23 2017 +0300

----------------------------------------------------------------------
 .../ignite/codegen/MessageCodeGenerator.java    |  6 +-
 .../processors/cache/ExchangeContext.java       |  6 ++
 .../GridCachePartitionExchangeManager.java      |  2 +-
 .../GridDhtPartitionsExchangeFuture.java        | 84 +++++++++-----------
 .../preloader/GridDhtPartitionsFullMessage.java | 16 +++-
 5 files changed, 64 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/39835f6a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 70672b0..f81fe7f 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.IgniteCodeGeneratingFail;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
 import org.apache.ignite.internal.util.typedef.internal.SB;
@@ -171,14 +172,15 @@ public class MessageCodeGenerator {
 
 //        gen.generateAll(true);
 
-        gen.generateAndWrite(GridDhtPartitionsFullMessage.class);
+//        gen.generateAndWrite(GridDhtPartitionsFullMessage.class);
+//        gen.generateAndWrite(GridDhtPartitionsSingleMessage.class);
 
 //        gen.generateAndWrite(GridNearAtomicUpdateRequest.class);
 
 //        gen.generateAndWrite(GridMessageCollection.class);
 //        gen.generateAndWrite(DataStreamerEntry.class);
 
-//        gen.generateAndWrite(GridDistributedLockRequest.class);
+//        gen.generateAndWrite(GridDhtPartitionsFullMessage.class);
 //        gen.generateAndWrite(GridDistributedLockResponse.class);
 //        gen.generateAndWrite(GridNearLockRequest.class);
 //        gen.generateAndWrite(GridNearLockResponse.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/39835f6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
index cac88be..c3eb6ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -56,6 +56,8 @@ public class ExchangeContext {
      */
     public ExchangeContext(int protocolVer) {
         fetchAffOnJoin = protocolVer == 1;
+
+        coalescing = protocolVer > 1;
     }
 
     /**
@@ -83,6 +85,10 @@ public class ExchangeContext {
         return requestGrpsAffOnJoin;
     }
 
+    public boolean coalescing() {
+        return coalescing;
+    }
+
     public List<List<ClusterNode>> activeAffinity(GridCacheSharedContext cctx,
GridAffinityAssignmentCache aff) {
         List<List<ClusterNode>> assignment = affMap.get(aff.groupId());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/39835f6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 6646456..135b771 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1739,7 +1739,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     private boolean supportsCoalescing(ClusterNode node) {
-        return node.version().compareToIgnoreTimestamp(EXCHANGE_COALESCING_SINCE) >= 0;
+        return exchangeProtocolVersion(node.version()) > 1;
     }
 
     /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/39835f6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a461c1c..9385739 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -36,7 +36,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.events.CacheEvent;
@@ -64,8 +63,6 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.ExchangeContext;
-import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
-import org.apache.ignite.internal.processors.cache.ExchangeContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -107,7 +104,6 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
-import static org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager.EXCHANGE_COALESCING_SINCE;
 
 /**
  * Future for exchanging partition maps.
@@ -228,10 +224,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     private final AtomicBoolean done = new AtomicBoolean();
 
     /** */
-    @GridToStringExclude
-    private ExchangeContext exchCtx;
-
-    /** */
     private FinishState finishState;
 
     /**
@@ -505,34 +497,34 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             }
             else {
                 if (exchCtx.coalescing()) {
-                    if (discoEvt.type() == EVT_NODE_JOINED) {
-                        if (discoEvt.eventNode().isLocal()) {
-                            localJoin();
-
-                            if (crdNode) {
-                                exchange = ExchangeType.NONE;
-                            }
-                            else
-                                sendLocalJoinMessage(crd);
-                        }
-                        else {
-                            if (CU.clientNode(discoEvt.eventNode())) {
-                                onClientNodeEvent(crdNode);
-
-                                exchange = ExchangeType.NONE;
-                            }
-                            else {
-                                if (cctx.kernalContext().clientNode())
-                                    exchange = ExchangeType.CLIENT;
-                                else {
-
-                                }
-                            }
-                        }
-                    }
-                    else {
-
-                    }
+//                    if (discoEvt.type() == EVT_NODE_JOINED) {
+//                        if (discoEvt.eventNode().isLocal()) {
+//                            localJoin();
+//
+//                            if (crdNode) {
+//                                exchange = ExchangeType.NONE;
+//                            }
+//                            else
+//                                sendLocalJoinMessage(crd);
+//                        }
+//                        else {
+//                            if (CU.clientNode(discoEvt.eventNode())) {
+//                                onClientNodeEvent(crdNode);
+//
+//                                exchange = ExchangeType.NONE;
+//                            }
+//                            else {
+//                                if (cctx.kernalContext().clientNode())
+//                                    exchange = ExchangeType.CLIENT;
+//                                else {
+//
+//                                }
+//                            }
+//                        }
+//                    }
+//                    else {
+//
+//                    }
                 }
                 else {
                     if (discoEvt.type() == EVT_NODE_JOINED) {
@@ -1460,7 +1452,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     private GridDhtPartitionsExchangeFuture mergedWith;
 
     /** */
-    private List<T2<ClusterNode, GridDhtPartitionsSingleMessage>> pendingMsgs;
+    private List<T2<UUID, GridDhtPartitionsSingleMessage>> pendingMsgs;
 
     /**
      * @param fut Current exchange to merge with.
@@ -1469,7 +1461,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     public void mergeWithFuture(final GridDhtPartitionsExchangeFuture fut) {
         log.info("Merge exchange future [fut=" + topologyVersion() + ", mergeWith=" + fut.topologyVersion()
+ ']');
 
-        List<T2<ClusterNode, GridDhtPartitionsSingleMessage>> pendingMsgs = null;
+        List<T2<UUID, GridDhtPartitionsSingleMessage>> pendingMsgs = null;
 
         synchronized (this) {
             synchronized (fut) {
@@ -1482,13 +1474,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 if (this.pendingMsgs != null) {
                     pendingMsgs = this.pendingMsgs;
 
-                    T2<ClusterNode, GridDhtPartitionsSingleMessage> joinedSrvMsg =
null;
+                    T2<UUID, GridDhtPartitionsSingleMessage> joinedSrvMsg = null;
 
                     if (discoEvt.type() == EVT_NODE_JOINED && !CU.clientNode(discoEvt.eventNode()))
{
-                        for (Iterator<T2<ClusterNode, GridDhtPartitionsSingleMessage>>
it = pendingMsgs.iterator(); it.hasNext();) {
-                            T2<ClusterNode, GridDhtPartitionsSingleMessage> msg = it.next();
+                        for (Iterator<T2<UUID, GridDhtPartitionsSingleMessage>>
it = pendingMsgs.iterator(); it.hasNext();) {
+                            T2<UUID, GridDhtPartitionsSingleMessage> msg = it.next();
 
-                            if (msg.get1().equals(discoEvt.eventNode())) {
+                            if (msg.get1().equals(discoEvt.eventNode().id())) {
                                 joinedSrvMsg = msg;
 
                                 it.remove();
@@ -1505,12 +1497,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         }
 
         if (pendingMsgs != null) {
-            final List<T2<ClusterNode, GridDhtPartitionsSingleMessage>> pendingMsgs0
= pendingMsgs;
+            final List<T2<UUID, GridDhtPartitionsSingleMessage>> pendingMsgs0
= pendingMsgs;
 
             fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>()
{
                 @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
fut0) {
-                    for (T2<ClusterNode, GridDhtPartitionsSingleMessage> msg : pendingMsgs0)
-                        fut.processMessage(msg.get1(), msg.get2());
+                    for (T2<UUID, GridDhtPartitionsSingleMessage> msg : pendingMsgs0)
+                        fut.processSingleMessage(msg.get1(), msg.get2());
                 }
             });
         }
@@ -1547,7 +1539,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     if (pendingMsgs == null)
                         pendingMsgs = new ArrayList<>();
 
-                    pendingMsgs.add(new T2<>(node, msg));
+                    pendingMsgs.add(new T2<>(node.id(), msg));
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/39835f6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index bd9eaf2..6a4d895 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -509,6 +509,12 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 writer.incrementState();
 
             case 12:
+                if (!writer.writeMessage("resTopVer", resTopVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 13:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -587,6 +593,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 12:
+                resTopVer = reader.readMessage("resTopVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -606,7 +620,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 13;
+        return 14;
     }
 
     /** {@inheritDoc} */


Mime
View raw message