asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject [1/2] asterixdb git commit: Support IFrameWriter contract check.
Date Sat, 10 Jun 2017 16:57:03 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master 456cb9fd0 -> 8cf8be67c


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index f83df3a..cbc6146 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.common.controllers;
 
+import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
 import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
 import static org.apache.hyracks.control.common.config.OptionTypes.LONG;
 import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
@@ -64,7 +65,8 @@ public class CCConfig extends ControllerConfig {
         CLUSTER_TOPOLOGY(STRING),
         JOB_QUEUE_CLASS(STRING, "org.apache.hyracks.control.cc.scheduler.FIFOJobQueue"),
         JOB_QUEUE_CAPACITY(INTEGER, 4096),
-        JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager");
+        JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager"),
+        ENFORCE_FRAME_WRITER_PROTOCOL(BOOLEAN, false);
 
         private final IOptionType parser;
         private Object defaultValue;
@@ -156,6 +158,9 @@ public class CCConfig extends ControllerConfig {
                     return "The maximum number of jobs to queue before rejecting new jobs";
                 case JOB_MANAGER_CLASS:
                     return "Specify the implementation class name for the job manager";
+                case ENFORCE_FRAME_WRITER_PROTOCOL:
+                    return "A flag indicating if runtime should enforce frame writer protocol
and detect "
+                            + "bad behaving operators";
                 default:
                     throw new IllegalStateException("NYI: " + this);
             }
@@ -357,4 +362,12 @@ public class CCConfig extends ControllerConfig {
     public int getJobQueueCapacity() {
         return getAppConfig().getInt(Option.JOB_QUEUE_CAPACITY);
     }
+
+    public boolean getEnforceFrameWriterProtocol() {
+        return getAppConfig().getBoolean(Option.ENFORCE_FRAME_WRITER_PROTOCOL);
+    }
+
+    public void setEnforceFrameWriterProtocol(boolean enforce) {
+        configManager.set(Option.ENFORCE_FRAME_WRITER_PROTOCOL, enforce);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index 42726a7..0fc3be7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -64,8 +64,10 @@ public class NCConfig extends ControllerConfig {
         MESSAGING_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS),
         MESSAGING_PUBLIC_PORT(INTEGER, MESSAGING_LISTEN_PORT),
         CLUSTER_CONNECT_RETRIES(INTEGER, 5),
-        IODEVICES(STRING_ARRAY, appConfig -> new String[] {
-                FileUtil.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR),
"iodevice") },
+        IODEVICES(
+                STRING_ARRAY,
+                appConfig -> new String[] {
+                        FileUtil.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR),
"iodevice") },
                 "<value of " + ControllerConfig.Option.DEFAULT_DIR.cmdline() + ">/iodevice"),
         NET_THREAD_COUNT(INTEGER, 1),
         NET_BUFFER_COUNT(INTEGER, 1),
@@ -95,7 +97,7 @@ public class NCConfig extends ControllerConfig {
         }
 
         <T> Option(IOptionType<T> parser, Function<IApplicationConfig, T>
defaultValue,
-                   String defaultValueDescription) {
+                String defaultValueDescription) {
             this.parser = parser;
             this.defaultValue = defaultValue;
             this.defaultValueDescription = defaultValueDescription;
@@ -246,6 +248,7 @@ public class NCConfig extends ControllerConfig {
         return configManager;
     }
 
+    @Override
     public IApplicationConfig getAppConfig() {
         return appConfig;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 04d48f3..d689bc0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -52,6 +52,7 @@ import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
 import org.apache.hyracks.api.job.IOperatorEnvironment;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.profiling.counters.ICounter;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.partitions.PartitionId;
@@ -104,9 +105,13 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable
{
 
     private Object sharedObject;
 
-    public Task(Joblet joblet, TaskAttemptId taskId, String displayName, ExecutorService
executor,
-            NodeControllerService ncs, List<List<PartitionChannel>> inputChannelsFromConnectors)
{
+    private final Set<JobFlag> jobFlags;
+
+    public Task(Joblet joblet, Set<JobFlag> jobFlags, TaskAttemptId taskId, String
displayName,
+            ExecutorService executor, NodeControllerService ncs,
+            List<List<PartitionChannel>> inputChannelsFromConnectors) {
         this.joblet = joblet;
+        this.jobFlags = jobFlags;
         this.taskAttemptId = taskId;
         this.displayName = displayName;
         this.executorService = executor;
@@ -426,4 +431,9 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable
{
     public Object getSharedObject() {
         return sharedObject;
     }
+
+    @Override
+    public Set<JobFlag> getJobFlags() {
+        return jobFlags;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 02e8051..95f3e83 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -37,6 +37,7 @@ import org.apache.hyracks.api.comm.PartitionChannel;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -131,12 +132,10 @@ public class StartTasksWork extends AbstractWork {
                 }
                 final int partition = tid.getPartition();
                 List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid);
-                task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor(),
ncs,
+                task = new Task(joblet, flags, taId, han.getClass().getName(), ncs.getExecutor(),
ncs,
                         createInputChannels(td, inputs));
                 IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition,
td.getPartitionCount());
-
                 List<IPartitionCollector> collectors = new ArrayList<>();
-
                 if (inputs != null) {
                     for (int i = 0; i < inputs.size(); ++i) {
                         IConnectorDescriptor conn = inputs.get(i);
@@ -145,26 +144,28 @@ public class StartTasksWork extends AbstractWork {
                             LOGGER.info("input: " + i + ": " + conn.getConnectorId());
                         }
                         RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
-                        IPartitionCollector collector = createPartitionCollector(td, partition,
task, i, conn,
-                                recordDesc, cPolicy);
+                        IPartitionCollector collector =
+                                createPartitionCollector(td, partition, task, i, conn, recordDesc,
cPolicy);
                         collectors.add(collector);
                     }
                 }
                 List<IConnectorDescriptor> outputs = ac.getActivityOutputMap().get(aid);
                 if (outputs != null) {
+                    final boolean enforce = flags.contains(JobFlag.ENFORCE_CONTRACT);
                     for (int i = 0; i < outputs.size(); ++i) {
                         final IConnectorDescriptor conn = outputs.get(i);
                         RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
                         IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
 
-                        IPartitionWriterFactory pwFactory = createPartitionWriterFactory(task,
cPolicy, jobId, conn,
-                                partition, taId, flags);
+                        IPartitionWriterFactory pwFactory =
+                                createPartitionWriterFactory(task, cPolicy, jobId, conn,
partition, taId, flags);
 
                         if (LOGGER.isLoggable(Level.INFO)) {
                             LOGGER.info("output: " + i + ": " + conn.getConnectorId());
                         }
-                        IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory,
partition, td
-                                .getPartitionCount(), td.getOutputPartitionCounts()[i]);
+                        IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory,
partition,
+                                td.getPartitionCount(), td.getOutputPartitionCounts()[i]);
+                        writer = enforce ? EnforceFrameWriter.enforce(writer) : writer;
                         operator.setOutputFrameWriter(i, writer, recordDesc);
                     }
                 }
@@ -203,11 +204,11 @@ public class StartTasksWork extends AbstractWork {
     private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor td, final
int partition, Task task,
             int i, IConnectorDescriptor conn, RecordDescriptor recordDesc, IConnectorPolicy
cPolicy)
             throws HyracksDataException {
-        IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition,
td
-                .getInputPartitionCounts()[i], td.getPartitionCount());
+        IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition,
+                td.getInputPartitionCounts()[i], td.getPartitionCount());
         if (cPolicy.materializeOnReceiveSide()) {
-            return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(),
collector, task
-                    .getTaskAttemptId(), ncs.getExecutor());
+            return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(),
collector,
+                    task.getTaskAttemptId(), ncs.getExecutor());
         } else {
             return collector;
         }
@@ -222,8 +223,9 @@ public class StartTasksWork extends AbstractWork {
                 factory = new IPartitionWriterFactory() {
                     @Override
                     public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException
{
-                        return new MaterializedPartitionWriter(ctx, ncs.getPartitionManager(),
new PartitionId(jobId,
-                                conn.getConnectorId(), senderIndex, receiverIndex), taId,
ncs.getExecutor());
+                        return new MaterializedPartitionWriter(ctx, ncs.getPartitionManager(),
+                                new PartitionId(jobId, conn.getConnectorId(), senderIndex,
receiverIndex), taId,
+                                ncs.getExecutor());
                     }
                 };
             } else {
@@ -231,9 +233,9 @@ public class StartTasksWork extends AbstractWork {
 
                     @Override
                     public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException
{
-                        return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(),
new PartitionId(
-                                jobId, conn.getConnectorId(), senderIndex, receiverIndex),
taId, ncs
-                                        .getExecutor());
+                        return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(),
+                                new PartitionId(jobId, conn.getConnectorId(), senderIndex,
receiverIndex), taId,
+                                ncs.getExecutor());
                     }
                 };
             }
@@ -241,8 +243,8 @@ public class StartTasksWork extends AbstractWork {
             factory = new IPartitionWriterFactory() {
                 @Override
                 public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException
{
-                    return new PipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(jobId,
conn
-                            .getConnectorId(), senderIndex, receiverIndex), taId);
+                    return new PipelinedPartition(ctx, ncs.getPartitionManager(),
+                            new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex),
taId);
                 }
             };
         }
@@ -272,11 +274,14 @@ public class StartTasksWork extends AbstractWork {
                 if (inputAddresses[i] != null) {
                     for (int j = 0; j < inputAddresses[i].length; j++) {
                         NetworkAddress networkAddress = inputAddresses[i][j];
-                        PartitionId pid = new PartitionId(jobId, inputs.get(i).getConnectorId(),
j, td
-                                .getTaskAttemptId().getTaskId().getPartition());
-                        PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs
-                                .getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
-                                        .lookupIpAddress()), networkAddress.getPort()), pid,
5));
+                        PartitionId pid = new PartitionId(jobId, inputs.get(i).getConnectorId(),
j,
+                                td.getTaskAttemptId().getTaskId().getPartition());
+                        PartitionChannel channel = new PartitionChannel(pid,
+                                new NetworkInputChannel(ncs.getNetworkManager(),
+                                        new InetSocketAddress(
+                                                InetAddress.getByAddress(networkAddress.lookupIpAddress()),
+                                                networkAddress.getPort()),
+                                        pid, 5));
                         channels.add(channel);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 7901141..3105d42 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -190,6 +190,9 @@ public class PreclusteredGroupWriter implements IFrameWriter {
             }
             aggregator.close();
             aggregateState.close();
+        } catch (Exception e) {
+            appenderWrapper.fail();
+            throw e;
         } finally {
             appenderWrapper.close();
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
index 48e6b35..9ac9296 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
@@ -18,15 +18,11 @@
  */
 package org.apache.hyracks.dataflow.std.misc;
 
-import java.nio.ByteBuffer;
-
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 
 public class NullSinkOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor
{
     private static final long serialVersionUID = 1L;
@@ -38,22 +34,6 @@ public class NullSinkOperatorDescriptor extends AbstractSingleActivityOperatorDe
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
{
-        return new AbstractUnaryInputSinkOperatorNodePushable() {
-            @Override
-            public void open() throws HyracksDataException {
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-            }
-        };
+        return new SinkOperatorNodePushable();
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java
index fe64472..d987a35 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java
@@ -19,15 +19,11 @@
 
 package org.apache.hyracks.dataflow.std.misc;
 
-import java.nio.ByteBuffer;
-
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 
 public class SinkOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -39,22 +35,6 @@ public class SinkOperatorDescriptor extends AbstractSingleActivityOperatorDescri
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
{
-        return new AbstractUnaryInputSinkOperatorNodePushable() {
-            @Override
-            public void open() throws HyracksDataException {
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-            }
-        };
+        return new SinkOperatorNodePushable();
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java
new file mode 100644
index 0000000..85e1bb8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+public class SinkOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable
{
+
+    @Override
+    public void open() throws HyracksDataException {
+        // Does nothing.
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        // Does nothing.
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        // Does nothing.
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        // Does nothing.
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
index bcebb7d..3c11669 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
@@ -63,6 +63,9 @@ public abstract class AbstractSortRunGenerator implements IRunGenerator
{
         flushWriter.open();
         try {
             getSorter().flush(flushWriter);
+        } catch (Exception e) {
+            flushWriter.fail();
+            throw e;
         } finally {
             flushWriter.close();
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 509607c..0352cea 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -78,6 +78,7 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput
     protected final boolean appendIndexFilter;
     protected ArrayTupleBuilder nonFilterTupleBuild;
     protected final ISearchOperationCallbackFactory searchCallbackFactory;
+    protected boolean failed = false;
 
     public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
int partition,
             int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory
indexHelperFactory,
@@ -193,7 +194,7 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput
                 writeSearchResults(i);
             }
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
@@ -207,12 +208,15 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput
         HyracksDataException closeException = null;
         if (index != null) {
             // if index == null, then the index open was not successful
-            try {
-                if (appender.getTupleCount() > 0) {
-                    appender.write(writer, true);
+            if (!failed) {
+                try {
+                    if (appender.getTupleCount() > 0) {
+                        appender.write(writer, true);
+                    }
+                } catch (Throwable th) {
+                    writer.fail();
+                    closeException = new HyracksDataException(th);
                 }
-            } catch (Throwable th) {
-                closeException = new HyracksDataException(th);
             }
 
             try {
@@ -251,6 +255,7 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput
 
     @Override
     public void fail() throws HyracksDataException {
+        failed = true;
         writer.fail();
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index 2171122..b100300 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -20,8 +20,10 @@ package org.apache.hyracks.test.support;
 
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hyracks.api.context.IHyracksJobletContext;
@@ -33,6 +35,7 @@ import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.resources.IDeallocatable;
 import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
@@ -155,4 +158,9 @@ public class TestTaskContext implements IHyracksTaskContext {
     public Object getSharedObject() {
         return sharedObject;
     }
+
+    @Override
+    public Set<JobFlag> getJobFlags() {
+        return EnumSet.noneOf(JobFlag.class);
+    }
 }


Mime
View raw message