Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B8284200CA5 for ; Sat, 10 Jun 2017 18:57:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B6B3C160BDA; Sat, 10 Jun 2017 16:57:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 60C17160BC8 for ; Sat, 10 Jun 2017 18:57:05 +0200 (CEST) Received: (qmail 35246 invoked by uid 500); 10 Jun 2017 16:57:04 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 35236 invoked by uid 99); 10 Jun 2017 16:57:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Jun 2017 16:57:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C25FDE000B; Sat, 10 Jun 2017 16:57:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: buyingyi@apache.org To: commits@asterixdb.apache.org Date: Sat, 10 Jun 2017 16:57:03 -0000 Message-Id: <52c4c8903aea4ef6a0a94c8d44860e41@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] asterixdb git commit: Support IFrameWriter contract check. archived-at: Sat, 10 Jun 2017 16:57:06 -0000 Repository: asterixdb Updated Branches: refs/heads/master 456cb9fd0 -> 8cf8be67c http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java index f83df3a..cbc6146 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.control.common.controllers; +import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN; import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER; import static org.apache.hyracks.control.common.config.OptionTypes.LONG; import static org.apache.hyracks.control.common.config.OptionTypes.STRING; @@ -64,7 +65,8 @@ public class CCConfig extends ControllerConfig { CLUSTER_TOPOLOGY(STRING), JOB_QUEUE_CLASS(STRING, "org.apache.hyracks.control.cc.scheduler.FIFOJobQueue"), JOB_QUEUE_CAPACITY(INTEGER, 4096), - JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager"); + JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager"), + ENFORCE_FRAME_WRITER_PROTOCOL(BOOLEAN, false); private final IOptionType parser; private Object defaultValue; @@ -156,6 +158,9 @@ public class CCConfig extends ControllerConfig { return "The maximum number of jobs to queue before rejecting new jobs"; case JOB_MANAGER_CLASS: return "Specify the implementation class name for the job manager"; + case ENFORCE_FRAME_WRITER_PROTOCOL: + return "A flag indicating if runtime should enforce frame writer protocol and detect " + + "bad behaving operators"; default: throw new IllegalStateException("NYI: " + this); } @@ -357,4 +362,12 @@ public class CCConfig extends ControllerConfig { public int getJobQueueCapacity() { return getAppConfig().getInt(Option.JOB_QUEUE_CAPACITY); } + + public boolean getEnforceFrameWriterProtocol() { + return getAppConfig().getBoolean(Option.ENFORCE_FRAME_WRITER_PROTOCOL); + } + + public void setEnforceFrameWriterProtocol(boolean enforce) { + configManager.set(Option.ENFORCE_FRAME_WRITER_PROTOCOL, enforce); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java index 42726a7..0fc3be7 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java @@ -64,8 +64,10 @@ public class NCConfig extends ControllerConfig { MESSAGING_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS), MESSAGING_PUBLIC_PORT(INTEGER, MESSAGING_LISTEN_PORT), CLUSTER_CONNECT_RETRIES(INTEGER, 5), - IODEVICES(STRING_ARRAY, appConfig -> new String[] { - FileUtil.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), "iodevice") }, + IODEVICES( + STRING_ARRAY, + appConfig -> new String[] { + FileUtil.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), "iodevice") }, "/iodevice"), NET_THREAD_COUNT(INTEGER, 1), NET_BUFFER_COUNT(INTEGER, 1), @@ -95,7 +97,7 @@ public class NCConfig extends ControllerConfig { } Option(IOptionType parser, Function defaultValue, - String defaultValueDescription) { + String defaultValueDescription) { this.parser = parser; this.defaultValue = defaultValue; this.defaultValueDescription = defaultValueDescription; @@ -246,6 +248,7 @@ public class NCConfig extends ControllerConfig { return configManager; } + @Override public IApplicationConfig getAppConfig() { return appConfig; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java index 04d48f3..d689bc0 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java @@ -52,6 +52,7 @@ import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.io.IWorkspaceFileFactory; import org.apache.hyracks.api.job.IOperatorEnvironment; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.profiling.counters.ICounter; import org.apache.hyracks.api.job.profiling.counters.ICounterContext; import org.apache.hyracks.api.partitions.PartitionId; @@ -104,9 +105,13 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable { private Object sharedObject; - public Task(Joblet joblet, TaskAttemptId taskId, String displayName, ExecutorService executor, - NodeControllerService ncs, List> inputChannelsFromConnectors) { + private final Set jobFlags; + + public Task(Joblet joblet, Set jobFlags, TaskAttemptId taskId, String displayName, + ExecutorService executor, NodeControllerService ncs, + List> inputChannelsFromConnectors) { this.joblet = joblet; + this.jobFlags = jobFlags; this.taskAttemptId = taskId; this.displayName = displayName; this.executorService = executor; @@ -426,4 +431,9 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable { public Object getSharedObject() { return sharedObject; } + + @Override + public Set getJobFlags() { + return jobFlags; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java index 02e8051..95f3e83 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java @@ -37,6 +37,7 @@ import org.apache.hyracks.api.comm.PartitionChannel; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.ActivityId; import org.apache.hyracks.api.dataflow.ConnectorDescriptorId; +import org.apache.hyracks.api.dataflow.EnforceFrameWriter; import org.apache.hyracks.api.dataflow.IActivity; import org.apache.hyracks.api.dataflow.IConnectorDescriptor; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; @@ -131,12 +132,10 @@ public class StartTasksWork extends AbstractWork { } final int partition = tid.getPartition(); List inputs = ac.getActivityInputMap().get(aid); - task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor(), ncs, + task = new Task(joblet, flags, taId, han.getClass().getName(), ncs.getExecutor(), ncs, createInputChannels(td, inputs)); IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount()); - List collectors = new ArrayList<>(); - if (inputs != null) { for (int i = 0; i < inputs.size(); ++i) { IConnectorDescriptor conn = inputs.get(i); @@ -145,26 +144,28 @@ public class StartTasksWork extends AbstractWork { LOGGER.info("input: " + i + ": " + conn.getConnectorId()); } RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId()); - IPartitionCollector collector = createPartitionCollector(td, partition, task, i, conn, - recordDesc, cPolicy); + IPartitionCollector collector = + createPartitionCollector(td, partition, task, i, conn, recordDesc, cPolicy); collectors.add(collector); } } List outputs = ac.getActivityOutputMap().get(aid); if (outputs != null) { + final boolean enforce = flags.contains(JobFlag.ENFORCE_CONTRACT); for (int i = 0; i < outputs.size(); ++i) { final IConnectorDescriptor conn = outputs.get(i); RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId()); IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId()); - IPartitionWriterFactory pwFactory = createPartitionWriterFactory(task, cPolicy, jobId, conn, - partition, taId, flags); + IPartitionWriterFactory pwFactory = + createPartitionWriterFactory(task, cPolicy, jobId, conn, partition, taId, flags); if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("output: " + i + ": " + conn.getConnectorId()); } - IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition, td - .getPartitionCount(), td.getOutputPartitionCounts()[i]); + IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition, + td.getPartitionCount(), td.getOutputPartitionCounts()[i]); + writer = enforce ? EnforceFrameWriter.enforce(writer) : writer; operator.setOutputFrameWriter(i, writer, recordDesc); } } @@ -203,11 +204,11 @@ public class StartTasksWork extends AbstractWork { private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor td, final int partition, Task task, int i, IConnectorDescriptor conn, RecordDescriptor recordDesc, IConnectorPolicy cPolicy) throws HyracksDataException { - IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition, td - .getInputPartitionCounts()[i], td.getPartitionCount()); + IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition, + td.getInputPartitionCounts()[i], td.getPartitionCount()); if (cPolicy.materializeOnReceiveSide()) { - return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector, task - .getTaskAttemptId(), ncs.getExecutor()); + return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector, + task.getTaskAttemptId(), ncs.getExecutor()); } else { return collector; } @@ -222,8 +223,9 @@ public class StartTasksWork extends AbstractWork { factory = new IPartitionWriterFactory() { @Override public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException { - return new MaterializedPartitionWriter(ctx, ncs.getPartitionManager(), new PartitionId(jobId, - conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutor()); + return new MaterializedPartitionWriter(ctx, ncs.getPartitionManager(), + new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, + ncs.getExecutor()); } }; } else { @@ -231,9 +233,9 @@ public class StartTasksWork extends AbstractWork { @Override public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException { - return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId( - jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs - .getExecutor()); + return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), + new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, + ncs.getExecutor()); } }; } @@ -241,8 +243,8 @@ public class StartTasksWork extends AbstractWork { factory = new IPartitionWriterFactory() { @Override public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException { - return new PipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(jobId, conn - .getConnectorId(), senderIndex, receiverIndex), taId); + return new PipelinedPartition(ctx, ncs.getPartitionManager(), + new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId); } }; } @@ -272,11 +274,14 @@ public class StartTasksWork extends AbstractWork { if (inputAddresses[i] != null) { for (int j = 0; j < inputAddresses[i].length; j++) { NetworkAddress networkAddress = inputAddresses[i][j]; - PartitionId pid = new PartitionId(jobId, inputs.get(i).getConnectorId(), j, td - .getTaskAttemptId().getTaskId().getPartition()); - PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs - .getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress - .lookupIpAddress()), networkAddress.getPort()), pid, 5)); + PartitionId pid = new PartitionId(jobId, inputs.get(i).getConnectorId(), j, + td.getTaskAttemptId().getTaskId().getPartition()); + PartitionChannel channel = new PartitionChannel(pid, + new NetworkInputChannel(ncs.getNetworkManager(), + new InetSocketAddress( + InetAddress.getByAddress(networkAddress.lookupIpAddress()), + networkAddress.getPort()), + pid, 5)); channels.add(channel); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java index 7901141..3105d42 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java @@ -190,6 +190,9 @@ public class PreclusteredGroupWriter implements IFrameWriter { } aggregator.close(); aggregateState.close(); + } catch (Exception e) { + appenderWrapper.fail(); + throw e; } finally { appenderWrapper.close(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java index 48e6b35..9ac9296 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java @@ -18,15 +18,11 @@ */ package org.apache.hyracks.dataflow.std.misc; -import java.nio.ByteBuffer; - import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable; public class NullSinkOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { private static final long serialVersionUID = 1L; @@ -38,22 +34,6 @@ public class NullSinkOperatorDescriptor extends AbstractSingleActivityOperatorDe @Override public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) { - return new AbstractUnaryInputSinkOperatorNodePushable() { - @Override - public void open() throws HyracksDataException { - } - - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - } - - @Override - public void close() throws HyracksDataException { - } - - @Override - public void fail() throws HyracksDataException { - } - }; + return new SinkOperatorNodePushable(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java index fe64472..d987a35 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java @@ -19,15 +19,11 @@ package org.apache.hyracks.dataflow.std.misc; -import java.nio.ByteBuffer; - import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable; public class SinkOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { private static final long serialVersionUID = 1L; @@ -39,22 +35,6 @@ public class SinkOperatorDescriptor extends AbstractSingleActivityOperatorDescri @Override public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) { - return new AbstractUnaryInputSinkOperatorNodePushable() { - @Override - public void open() throws HyracksDataException { - } - - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - } - - @Override - public void close() throws HyracksDataException { - } - - @Override - public void fail() throws HyracksDataException { - } - }; + return new SinkOperatorNodePushable(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java new file mode 100644 index 0000000..85e1bb8 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.dataflow.std.misc; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable; + +public class SinkOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable { + + @Override + public void open() throws HyracksDataException { + // Does nothing. + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + // Does nothing. + } + + @Override + public void close() throws HyracksDataException { + // Does nothing. + } + + @Override + public void fail() throws HyracksDataException { + // Does nothing. + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java index bcebb7d..3c11669 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java @@ -63,6 +63,9 @@ public abstract class AbstractSortRunGenerator implements IRunGenerator { flushWriter.open(); try { getSorter().flush(flushWriter); + } catch (Exception e) { + flushWriter.fail(); + throw e; } finally { flushWriter.close(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java index 509607c..0352cea 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java @@ -78,6 +78,7 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput protected final boolean appendIndexFilter; protected ArrayTupleBuilder nonFilterTupleBuild; protected final ISearchOperationCallbackFactory searchCallbackFactory; + protected boolean failed = false; public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory, @@ -193,7 +194,7 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput writeSearchResults(i); } } catch (Exception e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } @@ -207,12 +208,15 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput HyracksDataException closeException = null; if (index != null) { // if index == null, then the index open was not successful - try { - if (appender.getTupleCount() > 0) { - appender.write(writer, true); + if (!failed) { + try { + if (appender.getTupleCount() > 0) { + appender.write(writer, true); + } + } catch (Throwable th) { + writer.fail(); + closeException = new HyracksDataException(th); } - } catch (Throwable th) { - closeException = new HyracksDataException(th); } try { @@ -251,6 +255,7 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput @Override public void fail() throws HyracksDataException { + failed = true; writer.fail(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java index 2171122..b100300 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java @@ -20,8 +20,10 @@ package org.apache.hyracks.test.support; import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.EnumSet; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import org.apache.hyracks.api.context.IHyracksJobletContext; @@ -33,6 +35,7 @@ import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.profiling.counters.ICounterContext; import org.apache.hyracks.api.resources.IDeallocatable; import org.apache.hyracks.control.nc.io.WorkspaceFileFactory; @@ -155,4 +158,9 @@ public class TestTaskContext implements IHyracksTaskContext { public Object getSharedObject() { return sharedObject; } + + @Override + public Set getJobFlags() { + return EnumSet.noneOf(JobFlag.class); + } }