asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject incubator-asterixdb-hyracks git commit: ASTERIXDB-1145: Fix error propagating in operators/connectors: 1. When an AbstractUnarySourceOperator instance runs into an exception, it should call writer.fail() first and then throw the exception. 2. An IFrameWr
Date Fri, 13 Nov 2015 06:30:44 GMT
Repository: incubator-asterixdb-hyracks
Updated Branches:
  refs/heads/master 4c3145738 -> 8af1963fb


ASTERIXDB-1145: Fix error propagating in operators/connectors:
1. When an AbstractUnarySourceOperator instance runs into an exception, it should call writer.fail()
first and then throw the exception.
2. An IFrameWriter.fail() implementation should not throw yet-another exception, instead,
it should just propgate
   the failure to its downstream operators and optionally set a "failed" state so that in
the close()/nextFrame() method
   it can potentially behave differently from usual close()/nextFrame().

Change-Id: Ifb538155423687c4aa01a0485adeaab87f291547
Reviewed-on: https://asterix-gerrit.ics.uci.edu/491
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/commit/8af1963f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/tree/8af1963f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/diff/8af1963f

Branch: refs/heads/master
Commit: 8af1963fb57ebdb5c43bc93298fe3b6bf1c3f195
Parents: 4c31457
Author: Yingyi Bu <buyingyi@gmail.com>
Authored: Thu Nov 12 21:16:26 2015 -0800
Committer: Yingyi Bu <buyingyi@gmail.com>
Committed: Thu Nov 12 22:27:24 2015 -0800

----------------------------------------------------------------------
 .../meta/AlgebricksMetaOperatorDescriptor.java          |  3 +++
 .../partitions/ReceiveSideMaterializingCollector.java   |  8 +++++---
 .../std/collectors/InputChannelFrameReader.java         | 11 +++++++----
 .../std/collectors/NonDeterministicChannelReader.java   |  8 +++++---
 .../dataflow/TreeIndexStatsOperatorNodePushable.java    | 12 ++++++------
 5 files changed, 26 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/8af1963f/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index b64071c..8322cdc 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -108,6 +108,9 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper
                 try {
                     startOfPipeline.open();
                 } catch (HyracksDataException e) {
+                    // Tell the downstream the job fails.
+                    startOfPipeline.fail();
+                    // Throws the exception.
                     throw e;
                 } finally {
                     startOfPipeline.close();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/8af1963f/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
index e20c709..cc517e3 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
@@ -138,7 +138,9 @@ public class ReceiveSideMaterializingCollector implements IPartitionCollector
{
                     } else if (eos.get()) {
                         break;
                     } else if (failed.get()) {
-                        throw new HyracksDataException("Failure occurred on input");
+                        // Sends failure notification to its downstream.
+                        // It's not supposed to throw exception here because it is on the
failure notification channel.
+                        mpw.fail();
                     } else {
                         try {
                             synchronized (this) {
@@ -153,8 +155,8 @@ public class ReceiveSideMaterializingCollector implements IPartitionCollector
{
                 }
                 mpw.close();
                 channel.close();
-                delegate.addPartitions(Collections.singleton(new PartitionChannel(pid,
-                        new MaterializedPartitionInputChannel(1, pid, manager))));
+                delegate.addPartitions(Collections
+                        .singleton(new PartitionChannel(pid, new MaterializedPartitionInputChannel(1,
pid, manager))));
             } catch (HyracksException e) {
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/8af1963f/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
index 5f63546..11b7c31 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
@@ -57,7 +57,9 @@ public class InputChannelFrameReader implements IFrameReader, IInputChannelMonit
             }
         }
         if (failed) {
-            throw new HyracksDataException("Failure occurred on input");
+            // Do not throw exception here to allow the root cause exception gets propagated
to the master first.
+            // Return false to allow the nextFrame(...) call to be a non-op.
+            return false;
         }
         if (availableFrames <= 0 && eos) {
             return false;
@@ -67,12 +69,13 @@ public class InputChannelFrameReader implements IFrameReader, IInputChannelMonit
     }
 
     /**
-     * This implementation works under the truth that one Channel is never shared by two
readers.
+     * This implementation works under the truth that one Channel is neverNonDeterministicChannelReader
shared by two readers.
      * More precisely, one channel only has exact one reader and one writer side.
      *
-     * @param frame outputFrame
+     * @param frame
+     *            outputFrame
      * @return {@code true} if succeed to read the data from the channel to the {@code frame}.
-     * Otherwise return {@code false} if the end of stream is reached.
+     *         Otherwise return {@code false} if the end of stream is reached.
      * @throws HyracksDataException
      */
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/8af1963f/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
index 6dc8d9a..54f1016 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
@@ -106,7 +106,9 @@ public class NonDeterministicChannelReader implements IInputChannelMonitor,
IPar
                 return lastReadSender;
             }
             if (!failSenders.isEmpty()) {
-                throw new HyracksDataException("Failure occurred on input");
+                // Do not throw exception here to allow the root cause exception gets propagated
to the master first.
+                // Return a negative value to allow the nextFrame(...) call to be a non-op.
+                return -1;
             }
             for (int i = eosSenders.nextSetBit(0); i >= 0; i = eosSenders.nextSetBit(i))
{
                 channels[i].close();
@@ -127,8 +129,8 @@ public class NonDeterministicChannelReader implements IInputChannelMonitor,
IPar
     }
 
     public synchronized void close() throws HyracksDataException {
-        for (int i = closedSenders.nextClearBit(0); i >= 0 && i < nSenderPartitions;
i = closedSenders
-                .nextClearBit(i + 1)) {
+        for (int i = closedSenders.nextClearBit(0); i >= 0
+                && i < nSenderPartitions; i = closedSenders.nextClearBit(i + 1))
{
             if (channels[i] != null) {
                 channels[i].close();
                 channels[i] = null;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/8af1963f/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
index 584418c..fff3d57 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
@@ -70,9 +70,9 @@ public class TreeIndexStatsOperatorNodePushable extends AbstractUnaryOutputSourc
             int indexFileId = fileMapProvider.lookupFileId(treeIndexHelper.getFileReference());
             statsGatherer = new TreeIndexStatsGatherer(bufferCache, treeIndex.getFreePageManager(),
indexFileId,
                     treeIndex.getRootPageId());
-            TreeIndexStats stats = statsGatherer.gatherStats(treeIndex.getLeafFrameFactory().createFrame(),
treeIndex
-                    .getInteriorFrameFactory().createFrame(), treeIndex.getFreePageManager().getMetaDataFrameFactory()
-                    .createFrame());
+            TreeIndexStats stats = statsGatherer.gatherStats(treeIndex.getLeafFrameFactory().createFrame(),
+                    treeIndex.getInteriorFrameFactory().createFrame(),
+                    treeIndex.getFreePageManager().getMetaDataFrameFactory().createFrame());
             // Write the stats output as a single string field.
             FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
             ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
@@ -81,13 +81,13 @@ public class TreeIndexStatsOperatorNodePushable extends AbstractUnaryOutputSourc
             utf8SerDer.serialize(stats.toString(), dos);
             tb.addFieldEndOffset();
             if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()))
{
-                throw new HyracksDataException(
-                        "Record size (" + tb.getSize() + ") larger than frame size (" + appender.getBuffer().capacity()
-                                + ")");
+                throw new HyracksDataException("Record size (" + tb.getSize() + ") larger
than frame size ("
+                        + appender.getBuffer().capacity() + ")");
             }
             appender.flush(writer, false);
         } catch (Exception e) {
             writer.fail();
+            throw new HyracksDataException(e);
         } finally {
             writer.close();
             treeIndexHelper.close();


Mime
View raw message