ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sevdoki...@apache.org
Subject [41/50] [abbrv] incubator-ignite git commit: ignite-45 - Fixed topology updates.
Date Mon, 16 Mar 2015 09:23:28 GMT
ignite-45 - Fixed topology updates.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/74813a60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/74813a60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/74813a60

Branch: refs/heads/ignite-421
Commit: 74813a60127c9c5584b95f7a3ca755a232a6f983
Parents: 84e3196
Author: Alexey Goncharuk <agoncharuk@gridgain.com>
Authored: Sun Mar 15 22:06:45 2015 -0700
Committer: Alexey Goncharuk <agoncharuk@gridgain.com>
Committed: Sun Mar 15 22:06:45 2015 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/ignite/Ignite.java |  4 ---
 .../GridCachePartitionExchangeManager.java      |  6 ++--
 .../dht/GridClientPartitionTopology.java        | 19 ++++++++-----
 .../dht/GridDhtPartitionTopology.java           |  4 +--
 .../dht/GridDhtPartitionTopologyImpl.java       | 13 +++++----
 .../preloader/GridDhtPartitionExchangeId.java   |  8 ------
 .../GridDhtPartitionsExchangeFuture.java        | 30 ++++++++++++++++----
 7 files changed, 49 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74813a60/modules/core/src/main/java/org/apache/ignite/Ignite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java
index 4a5b469..7201734 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java
@@ -41,7 +41,6 @@ import java.util.concurrent.*;
  * <p>
  * In addition to {@link ClusterGroup} functionality, from here you can get the following:
  * <ul>
- * <li>{@link org.apache.ignite.cache.GridCache} - functionality for in-memory distributed
cache.</li>
  * <li>{@link IgniteDataStreamer} - functionality for loading data large amounts of
data into cache.</li>
  * <li>{@link IgniteFileSystem} - functionality for distributed Hadoop-compliant in-memory
file system and map-reduce.</li>
  * <li>{@link IgniteStreamer} - functionality for streaming events workflow with queries
and indexes into rolling windows.</li>
@@ -198,9 +197,6 @@ public interface Ignite extends AutoCloseable {
 
     public <K, V> IgniteCache<K, V> createCache(NearCacheConfiguration<K,
V> nearCfg);
 
-    // TODO IGNITE-45
-    //public <K, V> IgniteCache<K, V> createCache(String name, String path);
-
     /**
      * Stops dynamically started cache.
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74813a60/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 3e236bf..1c4c0f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -332,17 +332,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
     /**
      * @param cacheId Cache ID.
-     * @param exchId Exchange ID.
+     * @param exchFut Exchange future.
      * @return Topology.
      */
-    public GridDhtPartitionTopology clientTopology(int cacheId, GridDhtPartitionExchangeId
exchId) {
+    public GridDhtPartitionTopology clientTopology(int cacheId, GridDhtPartitionsExchangeFuture
exchFut) {
         GridClientPartitionTopology top = clientTops.get(cacheId);
 
         if (top != null)
             return top;
 
         GridClientPartitionTopology old = clientTops.putIfAbsent(cacheId,
-            top = new GridClientPartitionTopology(cctx, cacheId, exchId));
+            top = new GridClientPartitionTopology(cctx, cacheId, exchFut));
 
         return old != null ? old : top;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74813a60/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 646234c..27e1c22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -80,18 +80,21 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
     /**
      * @param cctx Context.
      * @param cacheId Cache ID.
-     * @param exchId Exchange ID.
+     * @param exchFut Exchange ID.
      */
-    public GridClientPartitionTopology(GridCacheSharedContext cctx, int cacheId,
-        GridDhtPartitionExchangeId exchId) {
+    public GridClientPartitionTopology(
+        GridCacheSharedContext cctx,
+        int cacheId,
+        GridDhtPartitionsExchangeFuture exchFut
+    ) {
         this.cctx = cctx;
         this.cacheId = cacheId;
 
-        topVer = exchId.topologyVersion();
+        topVer = exchFut.topologyVersion();
 
         log = cctx.logger(getClass());
 
-        beforeExchange(exchId);
+        beforeExchange(exchFut);
     }
 
     /**
@@ -181,7 +184,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
     }
 
     /** {@inheritDoc} */
-    @Override public void beforeExchange(GridDhtPartitionExchangeId exchId) {
+    @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut) {
         ClusterNode loc = cctx.localNode();
 
         lock.writeLock().lock();
@@ -190,6 +193,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
             if (stopping)
                 return;
 
+            GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
+
             assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer="
+
                 topVer + ", exchId=" + exchId + ']';
 
@@ -205,7 +210,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
             long updateSeq = this.updateSeq.incrementAndGet();
 
             // If this is the oldest node.
-            if (oldest.id().equals(loc.id())) {
+            if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId)) {
                 if (node2part == null) {
                     node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74813a60/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 7db15c7..dcc0502 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -67,10 +67,10 @@ public interface GridDhtPartitionTopology {
     /**
      * Pre-initializes this topology.
      *
-     * @param exchId Exchange ID for this pre-initialization.
+     * @param exchFut Exchange future.
      * @throws IgniteCheckedException If failed.
      */
-    public void beforeExchange(GridDhtPartitionExchangeId exchId) throws IgniteCheckedException;
+    public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException;
 
     /**
      * Post-initializes this topology.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74813a60/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 7a172cd..c123e92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -207,7 +207,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology
{
     }
 
     /** {@inheritDoc} */
-    @Override public void beforeExchange(GridDhtPartitionExchangeId exchId) throws IgniteCheckedException
{
+    @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut) throws
IgniteCheckedException {
         waitForRent();
 
         ClusterNode loc = cctx.localNode();
@@ -217,6 +217,8 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology
{
         lock.writeLock().lock();
 
         try {
+            GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
+
             if (stopping)
                 return;
 
@@ -235,7 +237,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology
{
             long updateSeq = this.updateSeq.incrementAndGet();
 
             // If this is the oldest node.
-            if (oldest.id().equals(loc.id()) || exchId.isCacheAdded(cctx.cacheId())) {
+            if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId())) {
                 if (node2part == null) {
                     node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq);
 
@@ -262,9 +264,10 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology
{
             if (cctx.rebalanceEnabled()) {
                 for (int p = 0; p < num; p++) {
                     // If this is the first node in grid.
-                    if ((oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId()))
|| exchId.isCacheAdded(
-                        cctx.cacheId())) {
-                        assert exchId.isJoined() || exchId.isCacheAdded(cctx.cacheId());
+                    boolean added = exchFut.isCacheAdded(cctx.cacheId());
+
+                    if ((oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId())
&& exchId.isJoined()) || added) {
+                        assert exchId.isJoined() || added;
 
                         try {
                             GridDhtLocalPartition locPart = localPartition(p, topVer, true,
false);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74813a60/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
index 96cd7cb..22910fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
@@ -106,14 +106,6 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa
         return evt == EVT_NODE_LEFT || evt == EVT_NODE_FAILED;
     }
 
-    /**
-     * @return {@code True} if cache was added with this exchange ID.
-     */
-    public boolean isCacheAdded(int cacheId) {
-        // TODO IGNITE-45 add cache added flag.
-        return evt == EVT_DISCOVERY_CUSTOM_EVT;
-    }
-
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         U.writeUuid(out, nodeId);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74813a60/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 19c609d..7858b84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -275,6 +275,23 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
+     * @param cacheId Cache ID to check.
+     * @return {@code True} if cache was added during this exchange.
+     */
+    public boolean isCacheAdded(int cacheId) {
+        if (!F.isEmpty(reqs)) {
+            for (DynamicCacheChangeRequest req : reqs) {
+                if (req.isStart() && !req.clientStartOnly()) {
+                    if (CU.cacheId(req.cacheName()) == cacheId)
+                        return true;
+                }
+            }
+        }
+
+        return false;
+    }
+
+    /**
      * Rechecks topology.
      */
     private void initTopology(GridCacheContext cacheCtx) throws IgniteCheckedException {
@@ -488,13 +505,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     assert topVer.equals(top.topologyVersion()) :
                         "Topology version is updated only in this class instances inside
single ExchangeWorker thread.";
 
-                    top.beforeExchange(exchId);
+                    top.beforeExchange(this);
                 }
 
                 for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
{
                     top.updateTopologyVersion(exchId, this, stopping(top.cacheId()));
 
-                    top.beforeExchange(exchId);
+                    top.beforeExchange(this);
                 }
             }
             catch (IgniteInterruptedCheckedException e) {
@@ -549,7 +566,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @return {@code True} if no distributed exchange is needed.
      */
     private boolean canSkipExchange() {
-        return false; // TODO ignite-45;
+        return false; // TODO ignite-23;
     }
 
     /**
@@ -907,7 +924,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             if (cacheCtx != null)
                 cacheCtx.topology().update(exchId, entry.getValue());
             else if (CU.oldest(cctx).isLocal())
-                cctx.exchange().clientTopology(cacheId, exchId).update(exchId, entry.getValue());
+                cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue());
         }
     }
 
@@ -922,7 +939,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
 
             GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() :
-                cctx.exchange().clientTopology(cacheId, exchId);
+                cctx.exchange().clientTopology(cacheId, this);
 
             top.update(exchId, entry.getValue());
         }
@@ -977,7 +994,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                                             try {
                                                 for (GridCacheContext cacheCtx : cctx.cacheContexts())
{
                                                     if (!cacheCtx.isLocal())
-                                                        cacheCtx.topology().beforeExchange(exchId);
+                                                        cacheCtx.topology().beforeExchange(
+                                                            GridDhtPartitionsExchangeFuture.this);
                                                 }
                                             }
                                             catch (IgniteCheckedException e) {


Mime
View raw message