asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "abdullah alamoudi (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: ASTERIXDB-1838 Fix SuperActivityOperatorNodePushable
Date Thu, 13 Apr 2017 22:10:11 GMT
abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1681

Change subject: ASTERIXDB-1838 Fix SuperActivityOperatorNodePushable
......................................................................

ASTERIXDB-1838 Fix SuperActivityOperatorNodePushable

Change-Id: Ie5994f8a51dcf43e42325e89215758c310cd7b99
---
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
1 file changed, 86 insertions(+), 24 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/81/1681/1

diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 1c4f916..523b468 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -25,9 +25,10 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Map.Entry;
+import java.util.Queue;
 import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -43,20 +44,22 @@
 /**
  * The runtime of a SuperActivity, which internally executes a DAG of one-to-one
  * connected activities in a single thread.
- *
- * @author yingyib
  */
 public class SuperActivityOperatorNodePushable implements IOperatorNodePushable {
-    private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables = new
HashMap<ActivityId, IOperatorNodePushable>();
-    private final List<IOperatorNodePushable> operatorNodePushablesBFSOrder = new ArrayList<IOperatorNodePushable>();
+    private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables = new
HashMap<>();
+    private final List<IOperatorNodePushable> operatorNodePushablesBFSOrder = new ArrayList<>();
     private final Map<ActivityId, IActivity> startActivities;
     private final SuperActivity parent;
     private final IHyracksTaskContext ctx;
     private final IRecordDescriptorProvider recordDescProvider;
+    private final ReentrantReadWriteLock actionLock = new ReentrantReadWriteLock();
+    private volatile boolean cancelled = false;
     private final int partition;
     private final int nPartitions;
     private int inputArity = 0;
     private boolean[] startedInitialization;
+    private boolean[] completedInitialization;
+    private boolean[] completedDeinitialization;
 
     public SuperActivityOperatorNodePushable(SuperActivity parent, Map<ActivityId, IActivity>
startActivities,
             IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition,
int nPartitions) {
@@ -82,22 +85,37 @@
     public void initialize() throws HyracksDataException {
         // Initializes all OperatorNodePushables in parallel and then finally deinitializes
them.
         runInParallel((op, index) -> {
-            startedInitialization[index] = true;
-            op.initialize();
-        });
+            actionLock.readLock().lock();
+            try {
+                if (cancelled) {
+                    return;
+                }
+                startedInitialization[index] = true;
+            } finally {
+                actionLock.readLock().unlock();
+            }
+            try {
+                op.initialize();
+            } finally {
+                synchronized (op) {
+                    completedInitialization[index] = true;
+                    op.notify();
+                }
+            }
+        }, true);
     }
 
     private void init() throws HyracksDataException {
-        Map<ActivityId, IOperatorNodePushable> startOperatorNodePushables = new HashMap<ActivityId,
IOperatorNodePushable>();
-        Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>
childQueue = new LinkedList<Pair<Pair<IActivity, Integer>, Pair<IActivity,
Integer>>>();
-        List<IConnectorDescriptor> outputConnectors = null;
+        Map<ActivityId, IOperatorNodePushable> startOperatorNodePushables = new HashMap<>();
+        Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>
childQueue = new LinkedList<>();
+        List<IConnectorDescriptor> outputConnectors;
 
         /**
          * Set up the source operators
          */
         for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) {
-            IOperatorNodePushable opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider,
partition,
-                    nPartitions);
+            IOperatorNodePushable opPushable =
+                    entry.getValue().createPushRuntime(ctx, recordDescProvider, partition,
nPartitions);
             startOperatorNodePushables.put(entry.getKey(), opPushable);
             operatorNodePushablesBFSOrder.add(opPushable);
             operatorNodePushables.put(entry.getKey(), opPushable);
@@ -157,16 +175,38 @@
 
         // Sets the startedInitialization flags to be false.
         startedInitialization = new boolean[operatorNodePushablesBFSOrder.size()];
+        completedInitialization = new boolean[operatorNodePushablesBFSOrder.size()];
+        completedDeinitialization = new boolean[operatorNodePushablesBFSOrder.size()];
         Arrays.fill(startedInitialization, false);
+        Arrays.fill(completedInitialization, false);
+        Arrays.fill(completedDeinitialization, false);
     }
 
     @Override
     public void deinitialize() throws HyracksDataException {
         runInParallel((op, index) -> {
             if (startedInitialization[index]) {
-                op.deinitialize();
+                if (!completedInitialization[index]) {
+                    synchronized (op) {
+                        while (!completedInitialization[index]) {
+                            try {
+                                op.wait();
+                            } catch (InterruptedException e) {
+                                // Ignore on purpose. we have to deinitialize
+                            }
+                        }
+                    }
+                }
+                try {
+                    op.deinitialize();
+                } finally {
+                    synchronized (SuperActivityOperatorNodePushable.this) {
+                        completedDeinitialization[index] = true;
+                        SuperActivityOperatorNodePushable.this.notify();
+                    }
+                }
             }
-        });
+        }, false);
     }
 
     @Override
@@ -192,8 +232,7 @@
          */
         Pair<ActivityId, Integer> activityIdInputIndex = parent.getActivityIdInputIndex(index);
         IOperatorNodePushable operatorNodePushable = operatorNodePushables.get(activityIdInputIndex.getLeft());
-        IFrameWriter writer = operatorNodePushable.getInputFrameWriter(activityIdInputIndex.getRight());
-        return writer;
+        return operatorNodePushable.getInputFrameWriter(activityIdInputIndex.getRight());
     }
 
     @Override
@@ -205,25 +244,48 @@
         void runAction(IOperatorNodePushable op, int opIndex) throws HyracksDataException;
     }
 
-    private void runInParallel(OperatorNodePushableAction opAction) throws HyracksDataException
{
-        List<Future<Void>> initializationTasks = new ArrayList<>();
+    private void runInParallel(OperatorNodePushableAction opAction, boolean initialize) throws
HyracksDataException {
+        List<Future<Void>> tasks = new ArrayList<>();
         try {
             int index = 0;
             // Run one action for all OperatorNodePushables in parallel through a thread
pool.
             for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) {
                 final int opIndex = index++;
-                initializationTasks.add(ctx.getExecutorService().submit(() -> {
+                tasks.add(ctx.getExecutorService().submit(() -> {
                     opAction.runAction(op, opIndex);
                     return null;
                 }));
             }
             // Waits until all parallel actions to finish.
-            for (Future<Void> initializationTask : initializationTasks) {
-                initializationTask.get();
+            for (Future<Void> task : tasks) {
+                task.get();
             }
         } catch (Exception e) {
-            for (Future<Void> initializationTask : initializationTasks) {
-                initializationTask.cancel(true);
+            if (initialize) {
+                actionLock.writeLock().lock();
+                try {
+                    cancelled = true;
+                    for (Future<Void> task : tasks) {
+                        task.cancel(true);
+                    }
+                } finally {
+                    actionLock.writeLock().unlock();
+                }
+            } else {
+                // wait for completion
+                for (int index = 0; index < tasks.size(); index++) {
+                    if (completedInitialization[index] && !completedDeinitialization[index])
{
+                        synchronized (SuperActivityOperatorNodePushable.this) {
+                            while (!completedDeinitialization[index]) {
+                                try {
+                                    SuperActivityOperatorNodePushable.this.wait();
+                                } catch (InterruptedException interrupt) {
+                                    // ignoring on purpose. we have to deinitialize what
we have initialized
+                                }
+                            }
+                        }
+                    }
+                }
             }
             throw new HyracksDataException(e);
         }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1681
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie5994f8a51dcf43e42325e89215758c310cd7b99
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamousaa@gmail.com>

Mime
View raw message