asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject asterixdb git commit: Cancel the on-going job if waitForCompletion is interrupted.
Date Sat, 10 Jun 2017 06:36:00 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master 486c4cdd2 -> d6ca4b56e


Cancel the on-going job if waitForCompletion is interrupted.

Change-Id: I3417271660e815a13fd706e1cc057bca6a625c37
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1825
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/d6ca4b56
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/d6ca4b56
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/d6ca4b56

Branch: refs/heads/master
Commit: d6ca4b56ebcbdee8e87f89448c696847a9b05582
Parents: 486c4cd
Author: Yingyi Bu <yingyi@couchbase.com>
Authored: Fri Jun 9 20:41:49 2017 -0700
Committer: Yingyi Bu <buyingyi@gmail.com>
Committed: Fri Jun 9 23:35:38 2017 -0700

----------------------------------------------------------------------
 .../hyracks/api/client/HyracksConnection.java   |  8 +++-
 .../AbstractMultiNCIntegrationTest.java         | 24 ++++--------
 .../tests/integration/CancelJobTest.java        | 41 ++++++++++++++++++++
 3 files changed, 56 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d6ca4b56/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 4b3aff2..ad54110 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -138,7 +138,13 @@ public final class HyracksConnection implements IHyracksClientConnection
{
 
     @Override
     public void waitForCompletion(JobId jobId) throws Exception {
-        hci.waitForCompletion(jobId);
+        try {
+            hci.waitForCompletion(jobId);
+        } catch (InterruptedException e) {
+            // Cancels an on-going job if the current thread gets interrupted.
+            hci.cancelJob(jobId);
+            throw e;
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d6ca4b56/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 148d4f5..05a7e2d 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -26,8 +26,6 @@ import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -40,6 +38,7 @@ import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.client.dataset.HyracksDataset;
 import org.apache.hyracks.control.cc.BaseCCApplication;
@@ -52,8 +51,9 @@ import org.apache.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 
 public abstract class AbstractMultiNCIntegrationTest {
 
@@ -70,9 +70,6 @@ public abstract class AbstractMultiNCIntegrationTest {
 
     private final List<File> outputFiles;
 
-    @Rule
-    public TemporaryFolder outputFolder = new TemporaryFolder();
-
     public AbstractMultiNCIntegrationTest() {
         outputFiles = new ArrayList<>();
     }
@@ -133,6 +130,10 @@ public abstract class AbstractMultiNCIntegrationTest {
         hcc.waitForCompletion(jobId);
     }
 
+    protected JobStatus getJobStatus(JobId jobId) throws Exception {
+        return hcc.getJobStatus(jobId);
+    }
+
     protected void cancelJob(JobId jobId) throws Exception {
         hcc.cancelJob(jobId);
     }
@@ -207,15 +208,6 @@ public abstract class AbstractMultiNCIntegrationTest {
         }
     }
 
-    protected File createTempFile() throws IOException {
-        File tempFile = File.createTempFile(getClass().getName(), ".tmp", outputFolder.getRoot());
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Output file: " + tempFile.getAbsolutePath());
-        }
-        outputFiles.add(tempFile);
-        return tempFile;
-    }
-
     public static class DummyApplication extends BaseCCApplication {
 
         @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d6ca4b56/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
index 7c3b66f..7eba9e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
@@ -37,6 +37,7 @@ import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.ManagedFileSplit;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.job.resource.ClusterCapacity;
 import org.apache.hyracks.api.job.resource.IClusterCapacity;
 import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
@@ -61,6 +62,14 @@ import org.junit.Test;
 public class CancelJobTest extends AbstractMultiNCIntegrationTest {
 
     @Test
+    public void interruptJobClientAfterWaitForCompletion() throws Exception {
+        // Interrupts the job client after waitForCompletion() is called.
+        for (JobSpecification spec : testJobs()) {
+            interruptAfterWaitForCompletion(spec);
+        }
+    }
+
+    @Test
     public void cancelExecutingJobAfterWaitForCompletion() throws Exception {
         //Cancels executing jobs after waitForCompletion() is called.
         for (JobSpecification spec : testJobs()) {
@@ -167,6 +176,38 @@ public class CancelJobTest extends AbstractMultiNCIntegrationTest {
         }
     }
 
+    private void interruptAfterWaitForCompletion(JobSpecification spec) throws Exception
{
+        // Submits the job
+        final JobId jobIdForInterruptTest = startJob(spec);
+
+        // Waits for completion in anther thread
+        Thread thread = new Thread(() -> {
+            try {
+                waitForCompletion(jobIdForInterruptTest);
+            } catch (Exception e) {
+                Assert.assertTrue(e instanceof InterruptedException);
+            }
+        });
+        thread.start();
+
+        // Interrupts the wait-for-completion thread.
+        thread.interrupt();
+
+        // Waits until the thread terminates.
+        thread.join();
+
+        // Verifies the job status.
+        JobStatus jobStatus = getJobStatus(jobIdForInterruptTest);
+        while (jobStatus == JobStatus.RUNNING) {
+            synchronized (this) {
+                // Since job cancellation is asynchronous on NCs, we have to wait there.
+                wait(1000);
+            }
+            jobStatus = getJobStatus(jobIdForInterruptTest);
+        }
+        Assert.assertTrue(jobStatus == JobStatus.FAILURE);
+    }
+
     private void cancelWithoutWait(JobSpecification spec) throws Exception {
         JobId jobId = startJob(spec);
         cancelJob(jobId);


Mime
View raw message