asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mb...@apache.org
Subject [1/6] asterixdb git commit: [NO ISSUE][RT] Report all errors on SuperActivityOperatorNodePushable
Date Thu, 22 Mar 2018 01:11:26 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master 1b412c542 -> 760279199


[NO ISSUE][RT] Report all errors on SuperActivityOperatorNodePushable

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Currently, if a failure happens in SuperActivityOperatorNodePushable,
  we only report that failure and miss the rest of the failures.
  This is especially critical in case of job cancellation since we
  don't know where each thread was interrupted.
- After this change, we suppress all other failures in the root
  failure for reporting purposes.

Change-Id: Ibbf31dd91303ce2f606734fcccb19270875266b3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2500
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/3c32971e
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/3c32971e
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/3c32971e

Branch: refs/heads/master
Commit: 3c32971e4636bc99784cd62185718a401a45e059
Parents: 1b412c5
Author: Abdullah Alamoudi <bamousaa@gmail.com>
Authored: Mon Mar 19 18:40:43 2018 -0700
Committer: Michael Blow <mblow@apache.org>
Committed: Mon Mar 19 21:25:18 2018 -0700

----------------------------------------------------------------------
 .../SuperActivityOperatorNodePushable.java      | 22 +++++--
 .../org/apache/hyracks/control/nc/Task.java     | 65 +++++++++-----------
 2 files changed, 46 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3c32971e/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 83ab532..d499554 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
@@ -44,6 +45,7 @@ import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.util.ExceptionUtils;
 
 /**
  * The runtime of a SuperActivity, which internally executes a DAG of one-to-one
@@ -193,15 +195,20 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable
     }
 
     private void runInParallel(OperatorNodePushableAction action) throws HyracksDataException
{
-        List<Future<Void>> tasks = new ArrayList<>();
+        List<Future<Void>> tasks = new ArrayList<>(operatorNodePushablesBFSOrder.size());
+        Queue<Throwable> failures = new ArrayBlockingQueue<>(operatorNodePushablesBFSOrder.size());
         final Semaphore startSemaphore = new Semaphore(1 - operatorNodePushablesBFSOrder.size());
         final Semaphore completeSemaphore = new Semaphore(1 - operatorNodePushablesBFSOrder.size());
+        Throwable root = null;
         try {
             for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) {
                 tasks.add(ctx.getExecutorService().submit(() -> {
                     startSemaphore.release();
                     try {
                         action.run(op);
+                    } catch (Throwable th) { // NOSONAR: Must catch all causes of failure
+                        failures.offer(th);
+                        throw th;
                     } finally {
                         completeSemaphore.release();
                     }
@@ -211,13 +218,16 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable
             for (Future<Void> task : tasks) {
                 task.get();
             }
-        } catch (InterruptedException e) {
-            cancelTasks(tasks, startSemaphore, completeSemaphore);
-            Thread.currentThread().interrupt();
-            throw HyracksDataException.create(e);
         } catch (ExecutionException e) {
+            root = e.getCause();
+        } catch (Throwable e) { // NOSONAR: Must catch all causes of failure
+            root = e;
+        }
+        if (root != null) {
+            final Throwable failure = root;
             cancelTasks(tasks, startSemaphore, completeSemaphore);
-            throw HyracksDataException.create(e.getCause());
+            failures.forEach(t -> ExceptionUtils.suppress(failure, t));
+            throw HyracksDataException.create(failure);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3c32971e/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index dcfc291..9d99968 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -102,7 +102,7 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable
{
 
     private volatile boolean aborted;
 
-    private NodeControllerService ncs;
+    private final NodeControllerService ncs;
 
     private List<List<PartitionChannel>> inputChannelsFromConnectors;
 
@@ -286,67 +286,62 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable
{
         }
         ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
         try {
-            Exception operatorException = null;
+            Throwable operatorException = null;
             try {
                 operator.initialize();
                 if (collectors.length > 0) {
                     final Semaphore sem = new Semaphore(collectors.length - 1);
                     for (int i = 1; i < collectors.length; ++i) {
+                        // Q. Do we ever have a task that has more than one collector?
                         final IPartitionCollector collector = collectors[i];
                         final IFrameWriter writer = operator.getInputFrameWriter(i);
-                        sem.acquire();
+                        sem.acquireUninterruptibly();
                         final int cIdx = i;
                         executorService.execute(() -> {
-                            Thread thread = Thread.currentThread();
-                            // Calls synchronized addPendingThread(..) to make sure that
in the abort() method,
-                            // the thread is not escaped from interruption.
-                            if (!addPendingThread(thread)) {
-                                return;
-                            }
-                            thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
-                            thread.setPriority(Thread.MIN_PRIORITY);
                             try {
-                                pushFrames(collector, inputChannelsFromConnectors.get(cIdx),
writer);
-                            } catch (HyracksDataException e) {
-                                synchronized (Task.this) {
-                                    exceptions.add(e);
+                                Thread thread = Thread.currentThread();
+                                // Calls synchronized addPendingThread(..) to make sure that
in the abort() method,
+                                // the thread is not escaped from interruption.
+                                if (!addPendingThread(thread)) {
+                                    return;
+                                }
+                                thread.setName(displayName + ":" + taskAttemptId + ":" +
cIdx);
+                                thread.setPriority(Thread.MIN_PRIORITY);
+                                try {
+                                    pushFrames(collector, inputChannelsFromConnectors.get(cIdx),
writer);
+                                } catch (HyracksDataException e) {
+                                    synchronized (Task.this) {
+                                        exceptions.add(e);
+                                    }
+                                } finally {
+                                    removePendingThread(thread);
                                 }
                             } finally {
                                 sem.release();
-                                removePendingThread(thread);
                             }
                         });
                     }
                     try {
                         pushFrames(collectors[0], inputChannelsFromConnectors.get(0), operator.getInputFrameWriter(0));
                     } finally {
-                        sem.acquire(collectors.length - 1);
+                        sem.acquireUninterruptibly(collectors.length - 1);
                     }
                 }
-            } catch (Exception e) {
-                // Store the operator exception
+            } catch (Throwable e) { // NOSONAR: Must catch all failures
                 operatorException = e;
-                throw e;
             } finally {
                 try {
                     operator.deinitialize();
-                } catch (Exception e) {
-                    if (operatorException != null) {
-                        // Add deinitialize exception to the operator exception to keep track
of both
-                        operatorException.addSuppressed(e);
-                    } else {
-                        operatorException = e;
-                    }
-                    throw operatorException;
+                } catch (Throwable e) { // NOSONAR: Must catch all failures
+                    operatorException = ExceptionUtils.suppress(operatorException, e);
                 }
             }
-            NodeControllerService ncs = joblet.getNodeController();
+            if (operatorException != null) {
+                throw operatorException;
+            }
             ncs.getWorkQueue().schedule(new NotifyTaskCompleteWork(ncs, this));
-        } catch (InterruptedException e) {
-            exceptions.add(e);
-            Thread.currentThread().interrupt();
-        } catch (Exception e) {
-            exceptions.add(e);
+        } catch (Throwable e) { // NOSONAR: Catch all failures
+            exceptions.add(HyracksDataException.create(e));
         } finally {
             close();
             removePendingThread(ct);
@@ -360,7 +355,6 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable
{
                             exceptions.get(i));
                 }
             }
-            NodeControllerService ncs = joblet.getNodeController();
             ExceptionUtils.setNodeIds(exceptions, ncs.getId());
             ncs.getWorkQueue()
                     .schedule(new NotifyTaskFailureWork(ncs, this, exceptions, joblet.getJobId(),
taskAttemptId));
@@ -457,6 +451,7 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable
{
         return ncs.createOrGetJobParameterByteStore(joblet.getJobId()).getParameterValue(name,
start, length);
     }
 
+    @Override
     public Set<JobFlag> getJobFlags() {
         return jobFlags;
     }


Mime
View raw message