asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject asterixdb git commit: [ASTERIXDB-2002][HYR] Report failures during task start
Date Tue, 25 Jul 2017 23:56:42 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master d8192749d -> 2c1c263b0


[ASTERIXDB-2002][HYR] Report failures during task start

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- failures that happen before creating the task object were
  never reported because the task object was null and they
  simply throw null pointer exception.

Change-Id: Ibf79088c1ea08e66a7b130e4836f153ea9603723
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1900
Reviewed-by: Xikui Wang <xkkwww@gmail.com>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/2c1c263b
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/2c1c263b
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/2c1c263b

Branch: refs/heads/master
Commit: 2c1c263b06872f6a5c274fcad99877b427b7e491
Parents: d819274
Author: Abdullah Alamoudi <bamousaa@gmail.com>
Authored: Tue Jul 25 13:17:13 2017 -0700
Committer: abdullah alamoudi <bamousaa@gmail.com>
Committed: Tue Jul 25 16:56:16 2017 -0700

----------------------------------------------------------------------
 .../SuperActivityOperatorNodePushable.java      |  5 ++--
 .../org/apache/hyracks/control/nc/Task.java     |  8 ++++--
 .../control/nc/work/NotifyTaskFailureWork.java  | 21 ++++++++++----
 .../hyracks/control/nc/work/StartTasksWork.java | 10 +++++--
 .../AbstractMultiNCIntegrationTest.java         | 29 ++++++++++++++-----
 .../tests/integration/JobFailureTest.java       |  2 +-
 .../integration/LocalityAwareConnectorTest.java | 30 ++++++++++----------
 ...onOnCreatePushRuntimeOperatorDescriptor.java |  3 +-
 8 files changed, 71 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c1c263b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index def4c83..83ab532 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -61,7 +61,8 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable
     private int inputArity = 0;
 
     public SuperActivityOperatorNodePushable(SuperActivity parent, Map<ActivityId, IActivity>
startActivities,
-            IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition,
int nPartitions) {
+            IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition,
int nPartitions)
+            throws HyracksDataException {
         this.parent = parent;
         this.startActivities = startActivities;
         this.ctx = ctx;
@@ -76,7 +77,7 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable
         try {
             init();
         } catch (Exception e) {
-            throw new IllegalStateException(e);
+            throw HyracksDataException.create(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c1c263b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 74a628d..bff2794 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -275,7 +275,8 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable
{
         if (!addPendingThread(ct)) {
             exceptions.add(HyracksDataException.create(TASK_ABORTED, getTaskAttemptId()));
             ExceptionUtils.setNodeIds(exceptions, ncs.getId());
-            ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, exceptions));
+            ncs.getWorkQueue()
+                    .schedule(new NotifyTaskFailureWork(ncs, this, exceptions, joblet.getJobId(),
taskAttemptId));
             return;
         }
         ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
@@ -353,13 +354,14 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable
{
                 for (int i = 0; i < exceptions.size(); i++) {
                     LOGGER.log(Level.WARNING,
                             "Task " + taskAttemptId + " failed with exception"
-                                    + (exceptions.size() > 1 ? "s (" + (i + 1) + "/" +
exceptions.size()  + ")" : ""),
+                                    + (exceptions.size() > 1 ? "s (" + (i + 1) + "/" +
exceptions.size() + ")" : ""),
                             exceptions.get(i));
                 }
             }
             NodeControllerService ncs = joblet.getNodeController();
             ExceptionUtils.setNodeIds(exceptions, ncs.getId());
-            ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, exceptions));
+            ncs.getWorkQueue()
+                    .schedule(new NotifyTaskFailureWork(ncs, this, exceptions, joblet.getJobId(),
taskAttemptId));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c1c263b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
index e81fa5a..fa8ba28 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -19,7 +19,10 @@
 package org.apache.hyracks.control.nc.work;
 
 import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.common.work.AbstractWork;
@@ -27,30 +30,36 @@ import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.control.nc.Task;
 
 public class NotifyTaskFailureWork extends AbstractWork {
+    private static final Logger LOGGER = Logger.getLogger(NotifyTaskFailureWork.class.getName());
     private final NodeControllerService ncs;
     private final Task task;
+    private final JobId jobId;
+    private final TaskAttemptId taskId;
 
     private final List<Exception> exceptions;
 
-    public NotifyTaskFailureWork(NodeControllerService ncs, Task task, List<Exception>
exceptions) {
+    public NotifyTaskFailureWork(NodeControllerService ncs, Task task, List<Exception>
exceptions, JobId jobId,
+            TaskAttemptId taskId) {
         this.ncs = ncs;
         this.task = task;
         this.exceptions = exceptions;
+        this.jobId = jobId;
+        this.taskId = taskId;
     }
 
     @Override
     public void run() {
         try {
-            JobId jobId = task.getJobletContext().getJobId();
             IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
             if (dpm != null) {
                 dpm.abortReader(jobId);
             }
-            ncs.getClusterController().notifyTaskFailure(jobId, task.getTaskAttemptId(),
ncs.getId(), exceptions);
-            //exceptions.get(0).printStackTrace();
+            ncs.getClusterController().notifyTaskFailure(jobId, taskId, ncs.getId(), exceptions);
         } catch (Exception e) {
-            e.printStackTrace();
+            LOGGER.log(Level.SEVERE, "Failure reporting task failure to cluster controller",
e);
+        }
+        if (task != null) {
+            task.getJoblet().removeTask(task);
         }
-        task.getJoblet().removeTask(task);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c1c263b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index b55cd4b..c369781 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -102,6 +102,7 @@ public class StartTasksWork extends AbstractWork {
     @Override
     public void run() {
         Task task = null;
+        int taskIndex = 0;
         try {
             ncs.updateMaxJobId(jobId);
             NCServiceContext serviceCtx = ncs.getContext();
@@ -122,7 +123,8 @@ public class StartTasksWork extends AbstractWork {
                     return ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
                 }
             };
-            for (TaskAttemptDescriptor td : taskDescriptors) {
+            while (taskIndex < taskDescriptors.size()) {
+                TaskAttemptDescriptor td = taskDescriptors.get(taskIndex);
                 TaskAttemptId taId = td.getTaskAttemptId();
                 TaskId tid = taId.getTaskId();
                 ActivityId aid = tid.getActivityId();
@@ -133,6 +135,7 @@ public class StartTasksWork extends AbstractWork {
                 }
                 final int partition = tid.getPartition();
                 List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid);
+                task = null;
                 task = new Task(joblet, flags, taId, han.getClass().getName(), ncs.getExecutor(),
ncs,
                         createInputChannels(td, inputs));
                 IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition,
td.getPartitionCount());
@@ -174,13 +177,16 @@ public class StartTasksWork extends AbstractWork {
                 task.setTaskRuntime(collectors.toArray(new IPartitionCollector[collectors.size()]),
operator);
                 joblet.addTask(task);
                 task.start();
+                taskIndex++;
             }
         } catch (Exception e) {
             LOGGER.log(Level.WARNING, "Failure starting a task", e);
             // notify cc of start task failure
             List<Exception> exceptions = new ArrayList<>();
+            exceptions.add(e);
             ExceptionUtils.setNodeIds(exceptions, ncs.getId());
-            ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, task, exceptions));
+            TaskAttemptId taskId = taskDescriptors.get(taskIndex).getTaskAttemptId();
+            ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, task, exceptions,
jobId, taskId));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c1c263b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 05a7e2d..18479e2 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -59,8 +59,8 @@ public abstract class AbstractMultiNCIntegrationTest {
 
     private static final Logger LOGGER = Logger.getLogger(AbstractMultiNCIntegrationTest.class.getName());
 
-    public static final String[] ASTERIX_IDS = { "asterix-001", "asterix-002", "asterix-003",
"asterix-004",
-            "asterix-005", "asterix-006", "asterix-007" };
+    public static final String[] ASTERIX_IDS =
+            { "asterix-001", "asterix-002", "asterix-003", "asterix-004", "asterix-005",
"asterix-006", "asterix-007" };
 
     private static ClusterControllerService cc;
 
@@ -103,7 +103,7 @@ public abstract class AbstractMultiNCIntegrationTest {
             ncConfig.setClusterListenAddress("127.0.0.1");
             ncConfig.setDataListenAddress("127.0.0.1");
             ncConfig.setResultListenAddress("127.0.0.1");
-            ncConfig.setIODevices(new String [] { ioDev.getAbsolutePath() });
+            ncConfig.setIODevices(new String[] { ioDev.getAbsolutePath() });
             asterixNCs[i] = new NodeControllerService(ncConfig);
             asterixNCs[i].start();
         }
@@ -138,7 +138,7 @@ public abstract class AbstractMultiNCIntegrationTest {
         hcc.cancelJob(jobId);
     }
 
-    protected void runTest(JobSpecification spec) throws Exception {
+    protected void runTest(JobSpecification spec, String expectedErrorMessage) throws Exception
{
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(spec.toJSON().asText());
         }
@@ -180,14 +180,29 @@ public abstract class AbstractMultiNCIntegrationTest {
                     try {
                         bbis.close();
                     } catch (IOException e) {
-                        throw new HyracksDataException(e);
+                        throw HyracksDataException.create(e);
                     }
                 }
-
                 readSize = reader.read(resultFrame);
             }
         }
-        hcc.waitForCompletion(jobId);
+        boolean expectedExceptionThrown = false;
+        try {
+            hcc.waitForCompletion(jobId);
+        } catch (HyracksDataException hde) {
+            if (expectedErrorMessage != null) {
+                if (hde.toString().contains(expectedErrorMessage)) {
+                    expectedExceptionThrown = true;
+                } else {
+                    throw hde;
+                }
+            } else {
+                throw hde;
+            }
+        }
+        if (expectedErrorMessage != null && !expectedExceptionThrown) {
+            throw new Exception("Expected error (" + expectedErrorMessage + ") was not thrown");
+        }
         dumpOutputFiles();
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c1c263b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
index 871109a..13a103e 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
@@ -48,7 +48,7 @@ public class JobFailureTest extends AbstractMultiNCIntegrationTest {
         spec.connect(conn, sourceOpDesc, 0, sinkOpDesc, 0);
         spec.addRoot(sinkOpDesc);
         try {
-            runTest(spec);
+            runTest(spec, ExceptionOnCreatePushRuntimeOperatorDescriptor.ERROR_MESSAGE);
         } catch (Exception e) {
             e.printStackTrace();
             throw e;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c1c263b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
index e92113a..67642f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -69,15 +69,15 @@ import org.junit.Test;
 
 public class LocalityAwareConnectorTest extends AbstractMultiNCIntegrationTest {
 
-    final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
-            new FileSplit[] { new ManagedFileSplit(ASTERIX_IDS[0], "data" + File.separator
+ "tpch0.001"
-                    + File.separator + "lineitem.tbl"),
-                    new ManagedFileSplit(ASTERIX_IDS[1], "data" + File.separator + "tpch0.001"
+ File.separator
-                            + "lineitem.tbl"),
-                    new ManagedFileSplit(ASTERIX_IDS[2], "data" + File.separator + "tpch0.001"
+ File.separator
-                            + "lineitem.tbl"),
-                    new ManagedFileSplit(ASTERIX_IDS[3], "data" + File.separator + "tpch0.001"
+ File.separator
-                            + "lineitem.tbl") });
+    final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[]
{
+            new ManagedFileSplit(ASTERIX_IDS[0],
+                    "data" + File.separator + "tpch0.001" + File.separator + "lineitem.tbl"),
+            new ManagedFileSplit(ASTERIX_IDS[1],
+                    "data" + File.separator + "tpch0.001" + File.separator + "lineitem.tbl"),
+            new ManagedFileSplit(ASTERIX_IDS[2],
+                    "data" + File.separator + "tpch0.001" + File.separator + "lineitem.tbl"),
+            new ManagedFileSplit(ASTERIX_IDS[3],
+                    "data" + File.separator + "tpch0.001" + File.separator + "lineitem.tbl")
});
 
     final int fileSize = 800 * 1024 * 4;
 
@@ -112,8 +112,8 @@ public class LocalityAwareConnectorTest extends AbstractMultiNCIntegrationTest
{
 
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider,
tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner =
+                new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory, desc);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, "asterix-001",
"asterix-002",
                 "asterix-003", "asterix-004");
@@ -163,7 +163,7 @@ public class LocalityAwareConnectorTest extends AbstractMultiNCIntegrationTest
{
         spec.connect(conn2, grouper, 0, printer, 0);
 
         spec.addRoot(printer);
-        runTest(spec);
+        runTest(spec, null);
     }
 
     /**
@@ -177,8 +177,8 @@ public class LocalityAwareConnectorTest extends AbstractMultiNCIntegrationTest
{
 
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider,
tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner =
+                new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory, desc);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, "asterix-001",
"asterix-002",
                 "asterix-003", "asterix-004");
@@ -221,7 +221,7 @@ public class LocalityAwareConnectorTest extends AbstractMultiNCIntegrationTest
{
         spec.connect(conn2, grouper, 0, printer, 0);
 
         spec.addRoot(printer);
-        runTest(spec);
+        runTest(spec, null);
     }
 
     private AbstractSingleActivityOperatorDescriptor getPrinter(JobSpecification spec, String
prefix)

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c1c263b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
index f814cd5..d704671 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
@@ -38,6 +38,7 @@ public class ExceptionOnCreatePushRuntimeOperatorDescriptor extends AbstractSing
     private static AtomicInteger createPushRuntime = new AtomicInteger();
     private static AtomicInteger initializeCounter = new AtomicInteger();
     private static AtomicInteger openCloseCounter = new AtomicInteger();
+    public static final String ERROR_MESSAGE = "I throw exceptions";
     private final int[] exceptionPartitions;
     private final boolean sleepOnInitialize;
 
@@ -56,7 +57,7 @@ public class ExceptionOnCreatePushRuntimeOperatorDescriptor extends AbstractSing
             if (exceptionPartitions != null) {
                 for (int p : exceptionPartitions) {
                     if (p == partition) {
-                        throw new HyracksDataException("I throw exceptions");
+                        throw new HyracksDataException(ERROR_MESSAGE);
                     }
                 }
             }


Mime
View raw message