asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [1/2] asterixdb git commit: [ASTERIXDB-2058][HYR] Only Complete job cancellation after cleanup
Date Thu, 24 Aug 2017 18:09:33 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master 5eb13036d -> 87411c22c


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ParseDurationTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ParseDurationTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ParseDurationTest.java
new file mode 100644
index 0000000..12c61d6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ParseDurationTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime;
+
+import org.apache.asterix.api.http.server.Duration;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ParseDurationTest {
+
+    @Test
+    public void test() throws Exception {
+        // simple
+        Assert.assertEquals(0, Duration.parseDurationStringToNanos("0"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(5),
+                Duration.parseDurationStringToNanos("5s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(30),
+                Duration.parseDurationStringToNanos("30s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(1478),
+                Duration.parseDurationStringToNanos("1478s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(-5),
+                Duration.parseDurationStringToNanos("-5s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(5),
+                Duration.parseDurationStringToNanos("+5s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(0),
+                Duration.parseDurationStringToNanos("-0"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(0),
+                Duration.parseDurationStringToNanos("+0"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(5),
+                Duration.parseDurationStringToNanos("5.0s"));
+        Assert.assertEquals(
+                java.util.concurrent.TimeUnit.SECONDS.toNanos(5)
+                        + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(600),
+                Duration.parseDurationStringToNanos("5.6s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(5),
+                Duration.parseDurationStringToNanos("5.s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(500),
+                Duration.parseDurationStringToNanos(".5s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(1),
+                Duration.parseDurationStringToNanos("1.0s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(1),
+                Duration.parseDurationStringToNanos("1.00s"));
+        Assert.assertEquals(
+                java.util.concurrent.TimeUnit.SECONDS.toNanos(1)
+                        + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(4),
+                Duration.parseDurationStringToNanos("1.004s"));
+        Assert.assertEquals(
+                java.util.concurrent.TimeUnit.SECONDS.toNanos(1)
+                        + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(4),
+                Duration.parseDurationStringToNanos("1.0040s"));
+        Assert.assertEquals(
+                java.util.concurrent.TimeUnit.SECONDS.toNanos(100)
+                        + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(1),
+                Duration.parseDurationStringToNanos("100.00100s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.NANOSECONDS.toNanos(10),
+                Duration.parseDurationStringToNanos("10ns"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.MICROSECONDS.toNanos(11),
+                Duration.parseDurationStringToNanos("11us"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.MICROSECONDS.toNanos(12),
+                Duration.parseDurationStringToNanos("12µs"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.MICROSECONDS.toNanos(12),
+                Duration.parseDurationStringToNanos("12μs"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(13),
+                Duration.parseDurationStringToNanos("13ms"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(14),
+                Duration.parseDurationStringToNanos("14s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.MINUTES.toNanos(15),
+                Duration.parseDurationStringToNanos("15m"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.HOURS.toNanos(16),
+                Duration.parseDurationStringToNanos("16h"));
+        Assert.assertEquals(
+                java.util.concurrent.TimeUnit.HOURS.toNanos(3) + java.util.concurrent.TimeUnit.MINUTES.toNanos(30),
+                Duration.parseDurationStringToNanos("3h30m"));
+        Assert.assertEquals(
+                java.util.concurrent.TimeUnit.SECONDS.toNanos(10)
+                        + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(500)
+                        + java.util.concurrent.TimeUnit.MINUTES.toNanos(4),
+                Duration.parseDurationStringToNanos("10.5s4m"));
+        Assert.assertEquals(
+                java.util.concurrent.TimeUnit.MINUTES.toNanos(-2) + java.util.concurrent.TimeUnit.SECONDS.toNanos(-3)
+                        + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(-400),
+                Duration.parseDurationStringToNanos("-2m3.4s"));
+        Assert.assertEquals(
+                java.util.concurrent.TimeUnit.HOURS.toNanos(1) + java.util.concurrent.TimeUnit.MINUTES.toNanos(2)
+                        + java.util.concurrent.TimeUnit.SECONDS.toNanos(3)
+                        + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(4)
+                        + java.util.concurrent.TimeUnit.MICROSECONDS.toNanos(5)
+                        + java.util.concurrent.TimeUnit.NANOSECONDS.toNanos(6),
+                Duration.parseDurationStringToNanos("1h2m3s4ms5us6ns"));
+        Assert.assertEquals(
+                java.util.concurrent.TimeUnit.HOURS.toNanos(39) + java.util.concurrent.TimeUnit.MINUTES.toNanos(9)
+                        + java.util.concurrent.TimeUnit.SECONDS.toNanos(14)
+                        + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(425),
+                Duration.parseDurationStringToNanos("39h9m14.425s"));
+        Assert.assertEquals(java.util.concurrent.TimeUnit.NANOSECONDS.toNanos(52763797000L),
+                Duration.parseDurationStringToNanos("52763797000ns"));
+        Assert.assertEquals(1199999998800L, Duration.parseDurationStringToNanos("0.3333333333333333333333h"));
+        Assert.assertEquals(9007199254740993L, Duration.parseDurationStringToNanos("9007199254740993ns"));
+        Assert.assertEquals(9223372036854775807L, Duration.parseDurationStringToNanos("9223372036854775807ns"));
+        Assert.assertEquals(9223372036854775807L, Duration.parseDurationStringToNanos("9223372036854775.807us"));
+        Assert.assertEquals(9223372036854775807L, Duration.parseDurationStringToNanos("9223372036s854ms775us807ns"));
+        Assert.assertEquals(-9223372036854775807L, Duration.parseDurationStringToNanos("-9223372036854775807ns"));
+        assertFail("");
+        assertFail("3");
+        assertFail("-");
+        assertFail("s");
+        assertFail(".");
+        assertFail("-.");
+        assertFail(".s");
+        assertFail("+.s");
+        assertFail("3000000h");
+        assertFail("9223372036854775808ns");
+        assertFail("9223372036854775.808us");
+        assertFail("9223372036854ms775us808ns");
+        assertFail("-9223372036854775808ns");
+    }
+
+    @Test
+    public void testDurationFormatNanos() throws Exception {
+        Assert.assertEquals("123.456789012s", Duration.formatNanos(123456789012l));
+        Assert.assertEquals("12.345678901s", Duration.formatNanos(12345678901l));
+        Assert.assertEquals("1.234567890s", Duration.formatNanos(1234567890l));
+        Assert.assertEquals("123.456789ms", Duration.formatNanos(123456789l));
+        Assert.assertEquals("12.345678ms", Duration.formatNanos(12345678l));
+        Assert.assertEquals("1.234567ms", Duration.formatNanos(1234567l));
+        Assert.assertEquals("123.456µs", Duration.formatNanos(123456l));
+        Assert.assertEquals("12.345µs", Duration.formatNanos(12345l));
+        Assert.assertEquals("1.234µs", Duration.formatNanos(1234l));
+        Assert.assertEquals("123ns", Duration.formatNanos(123l));
+        Assert.assertEquals("12ns", Duration.formatNanos(12l));
+        Assert.assertEquals("1ns", Duration.formatNanos(1l));
+        Assert.assertEquals("-123.456789012s", Duration.formatNanos(-123456789012l));
+        Assert.assertEquals("120.000000000s", Duration.formatNanos(120000000000l));
+        Assert.assertEquals("-12ns", Duration.formatNanos(-12l));
+    }
+
+    private void assertFail(String duration) {
+        try {
+            Duration.parseDurationStringToNanos(duration);
+            Assert.fail("Expected parseDuration(" + duration + ") to fail but it didn't");
+        } catch (HyracksDataException hde) {
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 143f7d1..fcfb428 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -68,6 +68,9 @@ public class ErrorCode {
     public static final int POLYGON_3_POINTS = 25;
     public static final int POLYGON_INVALID = 26;
     public static final int OPERATION_NOT_SUPPORTED = 27;
+    public static final int INVALID_DURATION = 28;
+    public static final int UNKNOWN_DURATION_UNIT = 29;
+    public static final int QUERY_TIMEOUT = 30;
 
     public static final int INSTANTIATION_ERROR = 100;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 715c27d..5bd5482 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -61,6 +61,9 @@
 25 = Polygon must have at least 3 points
 26 = %1$s can not be an instance of polygon
 27 = Operation not supported
+28 = Invalid duration %1$s
+29 = Unknown duration unit %1$s
+30 = Query timed out
 
 100 = Unable to instantiate class %1$s
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-metadata/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/pom.xml b/asterixdb/asterix-metadata/pom.xml
index 5497cf4..458bb67 100644
--- a/asterixdb/asterix-metadata/pom.xml
+++ b/asterixdb/asterix-metadata/pom.xml
@@ -16,7 +16,8 @@
  ! specific language governing permissions and limitations
  ! under the License.
  !-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <artifactId>apache-asterixdb</artifactId>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index e2868ae..95479c1 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -130,6 +130,9 @@ public class HyracksClientInterfaceFunctions {
 
         public CancelJobFunction(JobId jobId) {
             this.jobId = jobId;
+            if (jobId == null) {
+                throw new IllegalArgumentException("jobId");
+            }
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index dbbaf9f..a3078b6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -65,6 +65,8 @@ import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker;
 import org.apache.hyracks.control.cc.work.JobCleanupWork;
 import org.apache.hyracks.control.common.job.PartitionState;
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
+import org.apache.hyracks.control.common.work.NoOpCallback;
+import org.apache.hyracks.control.common.work.IResultCallback;
 
 public class JobExecutor {
     private static final Logger LOGGER = Logger.getLogger(JobExecutor.class.getName());
@@ -114,9 +116,10 @@ public class JobExecutor {
         ccs.getContext().notifyJobStart(jobRun.getJobId());
     }
 
-    public void cancelJob() throws HyracksException {
+    public void cancelJob(IResultCallback<Void> callback) throws HyracksException {
         // If the job is already terminated or failed, do nothing here.
         if (jobRun.getPendingStatus() != null) {
+            callback.setValue(null);
             return;
         }
         // Sets the cancelled flag.
@@ -124,7 +127,8 @@ public class JobExecutor {
         // Aborts on-ongoing task clusters.
         abortOngoingTaskClusters(ta -> false, ta -> null);
         // Aborts the whole job.
-        abortJob(Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED,
jobRun.getJobId())));
+        abortJob(Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED,
jobRun.getJobId())),
+                callback);
     }
 
     private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Collection<ActivityCluster>
roots)
@@ -196,8 +200,8 @@ public class JobExecutor {
                     "Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters:
" + inProgressTaskClusters);
         }
         if (taskClusterRoots.isEmpty() && inProgressTaskClusters.isEmpty()) {
-            ccs.getWorkQueue()
-                    .schedule(new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(),
JobStatus.TERMINATED, null));
+            ccs.getWorkQueue().schedule(new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(),
JobStatus.TERMINATED,
+                    null, NoOpCallback.INSTANCE));
             return;
         }
         startRunnableTaskClusters(taskClusterRoots);
@@ -520,14 +524,14 @@ public class JobExecutor {
         }
     }
 
-    public void abortJob(List<Exception> exceptions) {
+    public void abortJob(List<Exception> exceptions, IResultCallback<Void> callback)
{
         Set<TaskCluster> inProgressTaskClustersCopy = new HashSet<>(inProgressTaskClusters);
         for (TaskCluster tc : inProgressTaskClustersCopy) {
             abortTaskCluster(findLastTaskClusterAttempt(tc), TaskClusterAttempt.TaskClusterStatus.ABORTED);
         }
         assert inProgressTaskClusters.isEmpty();
-        ccs.getWorkQueue()
-                .schedule(new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.FAILURE,
exceptions));
+        ccs.getWorkQueue().schedule(
+                new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.FAILURE,
exceptions, callback));
     }
 
     private void abortTaskCluster(TaskClusterAttempt tcAttempt,
@@ -686,7 +690,7 @@ public class JobExecutor {
                         + " as failed and the number of max re-attempts = " + maxReattempts);
                 if (lastAttempt.getAttempt() >= maxReattempts || isCancelled()) {
                     LOGGER.log(Level.INFO, "Aborting the job of " + ta.getTaskAttemptId());
-                    abortJob(exceptions);
+                    abortJob(exceptions, NoOpCallback.INSTANCE);
                     return;
                 }
                 LOGGER.log(Level.INFO, "We will try to start runnable activity clusters of
" + ta.getTaskAttemptId());
@@ -696,7 +700,7 @@ public class JobExecutor {
                         "Ignoring task failure notification: " + taId + " -- Current last
attempt = " + lastAttempt);
             }
         } catch (Exception e) {
-            abortJob(Collections.singletonList(e));
+            abortJob(Collections.singletonList(e), NoOpCallback.INSTANCE);
         }
     }
 
@@ -720,7 +724,7 @@ public class JobExecutor {
                     ta -> HyracksException.create(ErrorCode.NODE_FAILED, ta.getNodeId()));
             startRunnableActivityClusters();
         } catch (Exception e) {
-            abortJob(Collections.singletonList(e));
+            abortJob(Collections.singletonList(e), NoOpCallback.INSTANCE);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
index 8fe542f..a9ddee3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.control.common.work.IResultCallback;
 
 /**
  * This interface abstracts the job lifecycle management and job scheduling for a cluster.
@@ -47,10 +48,12 @@ public interface IJobManager {
     /**
      * Cancel a job with a given job id.
      *
+     * @param callback
+     *
      * @param jobId,
      *            the id of the job.
      */
-    void cancel(JobId jobId) throws HyracksException;
+    void cancel(JobId jobId, IResultCallback<Void> callback) throws HyracksException;
 
     /**
      * This method is called when the master process decides to complete job.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index abf1d57..4ba847d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -46,6 +46,8 @@ import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.cc.scheduler.FIFOJobQueue;
 import org.apache.hyracks.control.cc.scheduler.IJobQueue;
 import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.work.NoOpCallback;
+import org.apache.hyracks.control.common.work.IResultCallback;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -115,17 +117,14 @@ public class JobManager implements IJobManager {
     }
 
     @Override
-    public void cancel(JobId jobId) throws HyracksException {
-        if (jobId == null) {
-            return;
-        }
+    public void cancel(JobId jobId, IResultCallback<Void> callback) throws HyracksException
{
         // Cancels a running job.
         if (activeRunMap.containsKey(jobId)) {
             JobRun jobRun = activeRunMap.get(jobId);
             // The following call will abort all ongoing tasks and then consequently
             // trigger JobCleanupWork and JobCleanupNotificationWork which will update the
lifecyle of the job.
             // Therefore, we do not remove the job out of activeRunMap here.
-            jobRun.getExecutor().cancelJob();
+            jobRun.getExecutor().cancelJob(callback);
             return;
         }
         // Removes a pending job.
@@ -138,6 +137,7 @@ public class JobManager implements IJobManager {
             runMapArchive.put(jobId, jobRun);
             runMapHistory.put(jobId, exceptions);
         }
+        callback.setValue(null);
     }
 
     @Override
@@ -322,7 +322,7 @@ public class JobManager implements IJobManager {
             // fail the job then abort it
             run.setStatus(JobStatus.FAILURE, exceptions);
             // abort job will trigger JobCleanupWork
-            run.getExecutor().abortJob(exceptions);
+            run.getExecutor().abortJob(exceptions, NoOpCallback.INSTANCE);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
index f3b67c9..e3135df 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
@@ -42,10 +42,7 @@ public class CancelJobWork extends SynchronizableWork {
     @Override
     protected void doRun() throws Exception {
         try {
-            if (jobId != null) {
-                jobManager.cancel(jobId);
-            }
-            callback.setValue(null);
+            jobManager.cancel(jobId, callback);
         } catch (Exception e) {
             callback.setException(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
index 502ac50..bb85c13 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
@@ -29,6 +29,7 @@ import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.common.work.IResultCallback;
 
 public class JobCleanupWork extends AbstractWork {
     private static final Logger LOGGER = Logger.getLogger(JobCleanupWork.class.getName());
@@ -37,12 +38,15 @@ public class JobCleanupWork extends AbstractWork {
     private JobId jobId;
     private JobStatus status;
     private List<Exception> exceptions;
+    private IResultCallback<Void> callback;
 
-    public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus status, List<Exception>
exceptions) {
+    public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus status, List<Exception>
exceptions,
+            IResultCallback<Void> callback) {
         this.jobManager = jobManager;
         this.jobId = jobId;
         this.status = status;
         this.exceptions = exceptions;
+        this.callback = callback;
     }
 
     @Override
@@ -53,6 +57,7 @@ public class JobCleanupWork extends AbstractWork {
         try {
             JobRun jobRun = jobManager.get(jobId);
             jobManager.prepareComplete(jobRun, status, exceptions);
+            callback.setValue(null);
         } catch (HyracksException e) {
             // Fail the job with the caught exception during final completion.
             JobRun run = jobManager.get(jobId);
@@ -62,6 +67,7 @@ public class JobCleanupWork extends AbstractWork {
             }
             completionException.add(0, e);
             run.setStatus(JobStatus.FAILURE, completionException);
+            callback.setException(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index ed2a740..0d46d64 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -48,6 +48,7 @@ import org.apache.hyracks.control.cc.cluster.NodeManager;
 import org.apache.hyracks.control.common.base.INodeController;
 import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.control.common.logs.LogFile;
+import org.apache.hyracks.control.common.work.NoOpCallback;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -207,7 +208,7 @@ public class JobManagerTest {
     }
 
     @Test
-    public void testCancel() throws HyracksException {
+    public void testCancel() throws Exception {
         CCConfig ccConfig = new CCConfig();
         IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
         IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(),
jobCapacityController));
@@ -247,12 +248,12 @@ public class JobManagerTest {
 
         // Cancels deferred jobs.
         for (JobRun run : deferredRuns) {
-            jobManager.cancel(run.getJobId());
+            jobManager.cancel(run.getJobId(), NoOpCallback.INSTANCE);
         }
 
         // Cancels runnable jobs.
         for (JobRun run : acceptedRuns) {
-            jobManager.cancel(run.getJobId());
+            jobManager.cancel(run.getJobId(), NoOpCallback.INSTANCE);
         }
 
         Assert.assertTrue(jobManager.getPendingJobs().isEmpty());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java
index 150e0e8..ca0c7c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.control.common.work;
 
 public class FutureValue<T> implements IResultCallback<T> {
+
     private boolean done;
 
     private T value;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/NoOpCallback.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/NoOpCallback.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/NoOpCallback.java
new file mode 100644
index 0000000..041cee0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/NoOpCallback.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.work;
+
+public class NoOpCallback implements IResultCallback<Void> {
+
+    public static final NoOpCallback INSTANCE = new NoOpCallback();
+
+    private NoOpCallback() {
+    }
+
+    @Override
+    public void setValue(Void result) {
+        // Dummy is used when no callback is provided
+    }
+
+    @Override
+    public void setException(Exception e) {
+        // Dummy is used when no callback is provided
+    }
+
+}


Mime
View raw message