ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [1/3] incubator-ignite git commit: #ignite-710: Add checking version in DataStreamProcessor.
Date Wed, 22 Apr 2015 12:03:15 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-710 b010fad7c -> 14d0b0215


#ignite-710: Add checking version in DataStreamProcessor.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7d55edca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7d55edca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7d55edca

Branch: refs/heads/ignite-710
Commit: 7d55edca569ea2a8e30ada9a13a10677d4164add
Parents: b010fad
Author: ivasilinets <ivasilinets@gridgain.com>
Authored: Wed Apr 22 13:43:43 2015 +0300
Committer: ivasilinets <ivasilinets@gridgain.com>
Committed: Wed Apr 22 13:43:43 2015 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |  2 +-
 .../datastreamer/DataStreamProcessor.java       | 12 ++++
 .../datastreamer/DataStreamerImpl.java          |  3 +-
 .../datastreamer/DataStreamerRequest.java       | 32 ++++++++--
 .../DataStreamerMultiThreadedSelfTest.java      | 62 +++++++++-----------
 5 files changed, 70 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d55edca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 7a1324f..b10074f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -409,7 +409,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param ver Topology version.
      * @return Future or {@code null} is future is already completed.
      */
-    @Nullable IgniteInternalFuture<?> affinityReadyFuture(AffinityTopologyVersion ver)
{
+    public @Nullable IgniteInternalFuture<?> affinityReadyFuture(AffinityTopologyVersion
ver) {
         GridDhtPartitionsExchangeFuture lastInitializedFut0 = lastInitializedFut;
 
         if (lastInitializedFut0 != null && lastInitializedFut0.topologyVersion().compareTo(ver)
>= 0) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d55edca/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 3a2936f..fc7e8fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -251,6 +252,17 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter
{
             Exception err = null;
 
             try {
+                AffinityTopologyVersion locAffVer = ctx.cache().context().exchange().readyAffinityVersion();
+                AffinityTopologyVersion rmtAffVer = req.topologyVersion();
+
+                if (locAffVer.compareTo(rmtAffVer) < 0) {
+                    if (log.isDebugEnabled())
+                        log.debug("Received request has higher affinity topology version
[request=" + req +
+                            ", locTopVer=" + locAffVer + ", rmtTopVer=" + rmtAffVer + ']');
+
+                    ctx.cache().context().exchange().affinityReadyFuture(rmtAffVer).get();
+                }
+
                 job.call();
             }
             catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d55edca/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 002831c..a69e033 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1173,7 +1173,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
                     dep != null ? dep.userVersion() : null,
                     dep != null ? dep.participants() : null,
                     dep != null ? dep.classLoaderId() : null,
-                    dep == null);
+                    dep == null,
+                    ctx.cache().context().exchange().readyAffinityVersion());
 
                 try {
                     ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d55edca/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
index a216ffe..6f48118 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.datastreamer;
 
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
@@ -77,6 +78,9 @@ public class DataStreamerRequest implements Message {
     /** */
     private boolean forceLocDep;
 
+    /** Topology version. */
+    private AffinityTopologyVersion topVer;
+
     /**
      * {@code Externalizable} support.
      */
@@ -111,7 +115,8 @@ public class DataStreamerRequest implements Message {
         String userVer,
         Map<UUID, IgniteUuid> ldrParticipants,
         IgniteUuid clsLdrId,
-        boolean forceLocDep) {
+        boolean forceLocDep,
+        @NotNull AffinityTopologyVersion topVer) {
         this.reqId = reqId;
         this.resTopicBytes = resTopicBytes;
         this.cacheName = cacheName;
@@ -125,6 +130,7 @@ public class DataStreamerRequest implements Message {
         this.ldrParticipants = ldrParticipants;
         this.clsLdrId = clsLdrId;
         this.forceLocDep = forceLocDep;
+        this.topVer = topVer;
     }
 
     /**
@@ -218,6 +224,10 @@ public class DataStreamerRequest implements Message {
         return forceLocDep;
     }
 
+    public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(DataStreamerRequest.class, this);
@@ -302,12 +312,18 @@ public class DataStreamerRequest implements Message {
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeByteArray("updaterBytes", updaterBytes))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 12:
+                if (!writer.writeByteArray("updaterBytes", updaterBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 13:
                 if (!writer.writeString("userVer", userVer))
                     return false;
 
@@ -419,7 +435,7 @@ public class DataStreamerRequest implements Message {
                 reader.incrementState();
 
             case 11:
-                updaterBytes = reader.readByteArray("updaterBytes");
+                topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -427,6 +443,14 @@ public class DataStreamerRequest implements Message {
                 reader.incrementState();
 
             case 12:
+                updaterBytes = reader.readByteArray("updaterBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
                 userVer = reader.readString("userVer");
 
                 if (!reader.isLastRead())
@@ -446,6 +470,6 @@ public class DataStreamerRequest implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 13;
+        return 14;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d55edca/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
index 023730f..017d6a6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
@@ -20,12 +20,15 @@ package org.apache.ignite.internal.processors.datastreamer;
 import org.apache.ignite.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
 import org.apache.ignite.testframework.junits.common.*;
 
 import java.util.*;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.cache.CacheMode.*;
@@ -52,18 +55,10 @@ public class DataStreamerMultiThreadedSelfTest extends GridCommonAbstractTest
{
         ccfg.setBackups(1);
         cfg.setCacheConfiguration(ccfg);
 
-        if (gridName.contains("0"))
-            cfg.setClientMode(true);
-
         return cfg;
     }
 
     /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        stopAllGrids();
-    }
-
-    /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         stopAllGrids();
     }
@@ -72,43 +67,40 @@ public class DataStreamerMultiThreadedSelfTest extends GridCommonAbstractTest
{
      * @throws Exception If failed.
      */
     public void testStartStopIgnites() throws Exception {
-        final Ignite ignite = startGrid(0);
+        for (int attempt = 0; attempt < 10; ++attempt) {
+            final Ignite ignite = startGrid(0);
 
-        for (int a = 0; a < 10; ++a) {
-            final IgniteDataStreamer<Object, Object> dataLdr = ignite.dataStreamer(null);
+            try (final DataStreamerImpl<Object, Object> dataLdr = (DataStreamerImpl)ignite.dataStreamer(null))
{
 
-            final AtomicBoolean stop = new AtomicBoolean();
-            final AtomicInteger id = new AtomicInteger();
+                dataLdr.maxRemapCount(0);
 
-            IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
-                @Override public void run() {
-                    Set<Ignite> ignites = new HashSet<>();
-                    try {
-                        for (int i = 1; i < 5; ++i)
-                            ignites.add(startGrid(id.incrementAndGet()));
-                    }
-                    catch (Exception e) {
-                        new IgniteException(e);
+                final AtomicInteger id = new AtomicInteger(1);
+
+                IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new
Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        for (int i = 1; i < 10; ++i)
+                            startGrid(id.incrementAndGet());
+                        return true;
                     }
-                    finally {
-                        stop.set(true);
+                }, 1, "startedGridThread");
 
-                        System.out.println("!!!!!!!!!!!!!!!stop grid");
+                Random random = new Random();
 
-                        for (Ignite ignite1 : ignites)
-                            stopGrid(ignite1.name());
-                    }
-                }
-            }, 2);
+                Set<IgniteFuture> futs = new HashSet<>();
 
-            int j = 0;
+                while (!fut.isDone())
+                    futs.add(dataLdr.addData(random.nextInt(10000), random.nextInt(10000)));
 
-            while (!stop.get())
-                dataLdr.addData(j++, j);
+                int j = 0;
 
-            fut.get();
+                for (IgniteFuture f : futs) {
+                    System.out.println("Futures " + j  + " from " + futs.size());
+
+                    f.get();
+                }
+            }
 
-            ignite.cache(null).removeAll();
+            stopAllGrids();
         }
     }
 }


Mime
View raw message