ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yzhda...@apache.org
Subject ignite git commit: wip on data streamer
Date Fri, 14 Jul 2017 11:24:40 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5658 6fc5ad90f -> 19a2147cd


wip on data streamer


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/19a2147c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/19a2147c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/19a2147c

Branch: refs/heads/ignite-5658
Commit: 19a2147cde4a4ada064908224ba695d6b176b5b9
Parents: 6fc5ad9
Author: Yakov Zhdanov <yzhdanov@gridgain.com>
Authored: Fri Jul 14 14:24:28 2017 +0300
Committer: Yakov Zhdanov <yzhdanov@gridgain.com>
Committed: Fri Jul 14 14:24:28 2017 +0300

----------------------------------------------------------------------
 .../datastreamer/DataStreamerImpl.java          | 12 +--
 .../DataStreamProcessorSelfTest.java            | 93 +++++++++-----------
 2 files changed, 46 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/19a2147c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index c25a5ed..b7c359c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1465,13 +1465,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
          * @throws IgniteInterruptedCheckedException If thread has been interrupted.
          */
         @Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException
{
-            List<DataStreamerEntry> entries0 = null;
-            GridFutureAdapter<Object> curFut0 = null;
-
             acquireRemapSemaphore();
 
             for (PerPartitionBuffer b : entriesMap.values()) {
                 AffinityTopologyVersion batchTopVer = null;
+                List<DataStreamerEntry> entries0 = null;
+                GridFutureAdapter<Object> curFut0 = null;
 
                 synchronized (b) {
                     if (!b.entries.isEmpty()) {
@@ -1516,11 +1515,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
          * @throws IgniteInterruptedCheckedException If thread has been interrupted.
          */
         private void incrementActiveTasks() throws IgniteInterruptedCheckedException {
-            if (timeout == DFLT_UNLIMIT_TIMEOUT) {
-                if (sem.availablePermits() <= 0)
-                    U.debug(log, "No Permits node=" + node.order() + " " + node.isLocal()
+ " " + isLocNode);
+            if (timeout == DFLT_UNLIMIT_TIMEOUT)
                 U.acquire(sem);
-            }
             else if (!U.tryAcquire(sem, timeout, TimeUnit.MILLISECONDS)) {
                 if (log.isDebugEnabled())
                     log.debug("Failed to add parallel operation.");
@@ -1535,9 +1531,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
         private void signalTaskFinished(IgniteInternalFuture<Object> f) {
             assert f != null;
 
-
             sem.release();
-            U.debug(log, "Released: " + sem.availablePermits() + " node=" + node.order());
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/19a2147c/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
index e33d811..ec5e6d0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
@@ -473,7 +473,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest
{
             final AtomicBoolean done = new AtomicBoolean();
 
             try {
-                final int totalPutCnt = 10000;
+                final int totalPutCnt = 50000;
 
                 IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable<Object>()
{
                     @Override public Object call() throws Exception {
@@ -491,67 +491,60 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest
{
                             futs.add(ldr.addData(idx, idx));
                         }
 
-//                        ldr.flush();
+                        ldr.flush();
 
-//                        for (IgniteFuture<?> fut : futs) {
-//                            info("Before: " + fut);
-//
-//                            fut.get();
-//
-//                            info("After.");
-//                        }
+                        for (IgniteFuture<?> fut : futs)
+                            fut.get();
 
                         return null;
                     }
                 }, 5, "producer");
 
-//                IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>()
{
-//                    @Override public Object call() throws Exception {
-//                        while (!done.get()) {
-//                            ldr.flush();
-//
-//                            U.sleep(100);
-//                        }
-//
-//                        return null;
-//                    }
-//                }, 1, "flusher");
+                IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>()
{
+                    @Override public Object call() throws Exception {
+                        while (!done.get()) {
+                            ldr.flush();
+
+                            U.sleep(100);
+                        }
+
+                        return null;
+                    }
+                }, 1, "flusher");
 
                 // Define index of node being restarted.
                 final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 1;
 
-                //TODO Uncomment
-
-//                IgniteInternalFuture<?> fut3 = multithreadedAsync(new Callable<Object>()
{
-//                    @Override public Object call() throws Exception {
-//                        try {
-//                            for (int i = 0; i < 5; i++) {
-//                                Ignite g = startGrid(restartNodeIdx);
-//
-//                                UUID id = g.cluster().localNode().id();
-//
-//                                info(">>>>>>> Started node: " + id);
-//
-//                                U.sleep(1000);
-//
-//                                stopGrid(getTestIgniteInstanceName(restartNodeIdx), true);
-//
-//                                info(">>>>>>> Stopped node: " + id);
-//                            }
-//                        }
-//                        finally {
-//                            done.set(true);
-//
-//                            info("Start stop thread finished.");
-//                        }
-//
-//                        return null;
-//                    }
-//                }, 1, "start-stop-thread");
+                IgniteInternalFuture<?> fut3 = multithreadedAsync(new Callable<Object>()
{
+                    @Override public Object call() throws Exception {
+                        try {
+                            for (int i = 0; i < 5; i++) {
+                                Ignite g = startGrid(restartNodeIdx);
+
+                                UUID id = g.cluster().localNode().id();
+
+                                info(">>>>>>> Started node: " + id);
+
+                                U.sleep(1000);
+
+                                stopGrid(getTestIgniteInstanceName(restartNodeIdx), true);
+
+                                info(">>>>>>> Stopped node: " + id);
+                            }
+                        }
+                        finally {
+                            done.set(true);
+
+                            info("Start stop thread finished.");
+                        }
+
+                        return null;
+                    }
+                }, 1, "start-stop-thread");
 
                 fut1.get();
-//                fut2.get();
-//                fut3.get();
+                fut2.get();
+                fut3.get();
             }
             finally {
                 ldr.close(false);


Mime
View raw message