ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [18/27] ignite git commit: IGNITE-4676 Fixed hang if closure executed nested internal task with continuation. Added test. (cherry picked from commit e7a5307)
Date Wed, 31 May 2017 12:28:04 GMT
IGNITE-4676 Fixed hang if closure executed nested internal task with continuation. Added test.
(cherry picked from commit e7a5307)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9cd7e0f8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9cd7e0f8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9cd7e0f8

Branch: refs/heads/ignite-5232-1.7.2
Commit: 9cd7e0f8d132f9b7c496fe64f75f271ef60da5eb
Parents: ebc4a16
Author: Alexey Kuznetsov <akuznetsov@apache.org>
Authored: Thu Feb 9 16:44:41 2017 +0700
Committer: Alexey Kuznetsov <akuznetsov@apache.org>
Committed: Wed May 17 09:57:35 2017 +0700

----------------------------------------------------------------------
 .../internal/processors/job/GridJobWorker.java  |  4 +
 .../internal/GridContinuousTaskSelfTest.java    | 79 ++++++++++++++++++++
 2 files changed, 83 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9cd7e0f8/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 6a00d96..acefde7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -617,6 +617,10 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject
{
                 // Finish here only if not held by this thread.
                 if (!HOLD.get())
                     finishJob(res, ex, sndRes);
+                else
+                    // Make sure flag is not set for current thread.
+                    // This may happen in case of nested internal task call with continuation.
+                    HOLD.set(false);
 
                 ctx.job().currentTaskSession(null);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9cd7e0f8/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
index 98e3c5a..cec2887 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
@@ -21,10 +21,12 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.Callable;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteException;
@@ -43,7 +45,9 @@ import org.apache.ignite.compute.ComputeTaskSession;
 import org.apache.ignite.compute.ComputeTaskSessionAttributeListener;
 import org.apache.ignite.compute.ComputeTaskSessionFullSupport;
 import org.apache.ignite.compute.ComputeTaskSplitAdapter;
+import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.JobContextResource;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.resources.TaskContinuousMapperResource;
@@ -51,6 +55,7 @@ import org.apache.ignite.resources.TaskSessionResource;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.testframework.junits.common.GridCommonTest;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Continuous task test.
@@ -195,6 +200,80 @@ public class GridContinuousTaskSelfTest extends GridCommonAbstractTest
{
         }
     }
 
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testClosureWithNestedInternalTask() throws Exception {
+        try {
+            IgniteEx ignite = startGrid(0);
+
+            ComputeTaskInternalFuture<String> fut = ignite.context().closure().callAsync(GridClosureCallMode.BALANCE,
new Callable<String>() {
+                /** */
+                @IgniteInstanceResource
+                private IgniteEx g;
+
+                @Override public String call() throws Exception {
+                    return g.compute(g.cluster()).execute(NestedHoldccTask.class, null);
+                }
+            }, ignite.cluster().nodes());
+
+            assertEquals("DONE", fut.get(3000));
+        }
+        finally {
+            stopGrid(0, true);
+        }
+    }
+
+    /** Test task with continuation. */
+    @GridInternal
+    public static class NestedHoldccTask extends ComputeTaskAdapter<String, String>
{
+        /** {@inheritDoc} */
+        @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode>
subgrid,
+            @Nullable String arg) throws IgniteException {
+            Map<ComputeJob, ClusterNode> map = new HashMap<>();
+
+            for (ClusterNode node : subgrid)
+                map.put(new NestedHoldccJob(), node);
+
+            return map;
+
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public String reduce(List<ComputeJobResult> results) throws
IgniteException {
+            return results.get(0).getData();
+        }
+    }
+
+    /** Test job. */
+    public static class NestedHoldccJob extends ComputeJobAdapter {
+        /** */
+        @JobContextResource
+        private ComputeJobContext jobCtx;
+
+        /** */
+        private int cnt = 0;
+
+        /** {@inheritDoc} */
+        @Override public Object execute() throws IgniteException {
+            if (cnt < 1) {
+                cnt++;
+
+                jobCtx.holdcc();
+
+                new Timer().schedule(new TimerTask() {
+                    @Override public void run() {
+                        jobCtx.callcc();
+                    }
+                }, 500);
+
+                return "NOT DONE";
+            }
+
+            return "DONE";
+        }
+    }
+
     /** */
     @SuppressWarnings({"PublicInnerClass"})
     public static class TestMultipleHoldccCallsClosure implements IgniteClosure<Object,
Boolean> {


Mime
View raw message