Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8C18D10B32 for ; Wed, 25 Sep 2013 07:35:54 +0000 (UTC) Received: (qmail 42039 invoked by uid 500); 25 Sep 2013 07:34:10 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 41353 invoked by uid 500); 25 Sep 2013 07:33:16 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 40333 invoked by uid 99); 25 Sep 2013 07:32:15 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Sep 2013 07:32:15 +0000 X-ASF-Spam-Status: No, hits=-2002.3 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 25 Sep 2013 07:31:44 +0000 Received: (qmail 39431 invoked by uid 99); 25 Sep 2013 07:31:11 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Sep 2013 07:31:11 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 2C0B5909600; Wed, 25 Sep 2013 07:31:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.incubator.apache.org Date: Wed, 25 Sep 2013 07:31:45 -0000 Message-Id: <103b5df4ae3a4294a9050ee17c225fe8@git.apache.org> In-Reply-To: <951a7e7fa257470e83418fce839114b5@git.apache.org> References: <951a7e7fa257470e83418fce839114b5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [38/50] [abbrv] Rename tez-engine-api to tez-runtime-api and tez-engine is split into 2: - tez-engine-library for user-visible Input/Output/Processor implementations - tez-engine-internals for framework internals X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java deleted file mode 100644 index a984b0f..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java +++ /dev/null @@ -1,179 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tez.engine.lib.input; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.RawComparator; -import org.apache.tez.common.TezJobConfig; -import org.apache.tez.common.TezUtils; -import org.apache.tez.common.counters.TaskCounter; -import org.apache.tez.common.counters.TezCounter; -import org.apache.tez.engine.api.Event; -import org.apache.tez.engine.api.KVReader; -import org.apache.tez.engine.api.LogicalInput; -import org.apache.tez.engine.api.TezInputContext; -import org.apache.tez.engine.common.ConfigUtils; -import org.apache.tez.engine.common.ValuesIterator; -import org.apache.tez.engine.common.shuffle.impl.Shuffle; -import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator; - -/** - * ShuffleMergedInput in a {@link LogicalInput} which shuffles - * intermediate sorted data, merges them and provides key/ to the - * consumer. - * - * The Copy and Merge will be triggered by the initialization - which is handled - * by the Tez framework. Input is not consumable until the Copy and Merge are - * complete. Methods are provided to check for this, as well as to wait for - * completion. Attempting to get a reader on a non-complete input will block. - * - */ -public class ShuffledMergedInput implements LogicalInput { - - static final Log LOG = LogFactory.getLog(ShuffledMergedInput.class); - - protected TezInputContext inputContext; - protected TezRawKeyValueIterator rawIter = null; - protected Configuration conf; - protected int numInputs = 0; - protected Shuffle shuffle; - @SuppressWarnings("rawtypes") - protected ValuesIterator vIter; - - private TezCounter inputKeyCounter; - private TezCounter inputValueCounter; - - @Override - public List initialize(TezInputContext inputContext) throws IOException { - this.inputContext = inputContext; - this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload()); - - this.inputKeyCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS); - this.inputValueCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS); - this.conf.setStrings(TezJobConfig.LOCAL_DIRS, - inputContext.getWorkDirs()); - - // Start the shuffle - copy and merge. - shuffle = new Shuffle(inputContext, this.conf, numInputs); - shuffle.run(); - - return Collections.emptyList(); - } - - /** - * Check if the input is ready for consumption - * - * @return true if the input is ready for consumption, or if an error occurred - * processing fetching the input. false if the shuffle and merge are - * still in progress - */ - public boolean isInputReady() { - return shuffle.isInputReady(); - } - - /** - * Waits for the input to become ready for consumption - * @throws IOException - * @throws InterruptedException - */ - public void waitForInputReady() throws IOException, InterruptedException { - rawIter = shuffle.waitForInput(); - createValuesIterator(); - } - - @Override - public List close() throws IOException { - rawIter.close(); - return Collections.emptyList(); - } - - /** - * Get a KVReader for the Input.

This method will block until the input is - * ready - i.e. the copy and merge stages are complete. Users can use the - * isInputReady method to check if the input is ready, which gives an - * indication of whether this method will block or not. - * - * NOTE: All values for the current K-V pair must be read prior to invoking - * moveToNext. Once moveToNext() is called, the valueIterator from the - * previous K-V pair will throw an Exception - * - * @return a KVReader over the sorted input. - */ - @Override - public KVReader getReader() throws IOException { - if (rawIter == null) { - try { - waitForInputReady(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted while waiting for input ready", e); - } - } - return new KVReader() { - - @Override - public boolean next() throws IOException { - return vIter.moveToNext(); - } - - @SuppressWarnings("unchecked") - @Override - public KVRecord getCurrentKV() { - return new KVRecord(vIter.getKey(), vIter.getValues()); - } - }; - } - - @Override - public void handleEvents(List inputEvents) { - shuffle.handleEvents(inputEvents); - } - - @Override - public void setNumPhysicalInputs(int numInputs) { - this.numInputs = numInputs; - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - protected void createValuesIterator() - throws IOException { - vIter = new ValuesIterator(rawIter, - (RawComparator) ConfigUtils.getIntermediateInputKeyComparator(conf), - ConfigUtils.getIntermediateInputKeyClass(conf), - ConfigUtils.getIntermediateInputValueClass(conf), conf, inputKeyCounter, inputValueCounter); - - } - - // This functionality is currently broken. If there's inputs which need to be - // written to disk, there's a possibility that inputs from the different - // sources could clobber each others' output. Also the current structures do - // not have adequate information to de-dupe these (vertex name) -// public void mergeWith(ShuffledMergedInput other) { -// this.numInputs += other.getNumPhysicalInputs(); -// } -// -// public int getNumPhysicalInputs() { -// return this.numInputs; -// } -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInputLegacy.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInputLegacy.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInputLegacy.java deleted file mode 100644 index f2da031..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInputLegacy.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * ShuffleMergedInput in a {@link LogicalInput} which shuffles - * intermediate sorted data, merges them and provides key/ to the - * consumer. - * - * The Copy and Merge will be triggered by the initialization - which is handled - * by the Tez framework. Input is not consumable until the Copy and Merge are - * complete. Methods are provided to check for this, as well as to wait for - * completion. Attempting to get a reader on a non-complete input will block. - * - */ - -package org.apache.tez.engine.lib.input; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator; - -@LimitedPrivate("mapreduce") -public class ShuffledMergedInputLegacy extends ShuffledMergedInput { - - @Private - public TezRawKeyValueIterator getIterator() throws IOException, InterruptedException { - // wait for input so that iterator is available - waitForInputReady(); - return rawIter; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledUnorderedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledUnorderedKVInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledUnorderedKVInput.java deleted file mode 100644 index 44238fd..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledUnorderedKVInput.java +++ /dev/null @@ -1,76 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tez.engine.lib.input; - -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.tez.common.TezJobConfig; -import org.apache.tez.common.TezUtils; -import org.apache.tez.engine.api.Event; -import org.apache.tez.engine.api.LogicalInput; -import org.apache.tez.engine.api.Reader; -import org.apache.tez.engine.api.TezInputContext; -import org.apache.tez.engine.broadcast.input.BroadcastShuffleManager; - -import com.google.common.base.Preconditions; - -public class ShuffledUnorderedKVInput implements LogicalInput { - - private Configuration conf; - private int numInputs = -1; - private BroadcastShuffleManager shuffleManager; - - - - public ShuffledUnorderedKVInput() { - } - - @Override - public List initialize(TezInputContext inputContext) throws Exception { - Preconditions.checkArgument(numInputs != -1, "Number of Inputs has not been set"); - this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload()); - this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs()); - - this.shuffleManager = new BroadcastShuffleManager(inputContext, conf, numInputs); - return null; - } - - @Override - public Reader getReader() throws Exception { - // TODO Auto-generated method stub - return null; - } - - @Override - public void handleEvents(List inputEvents) { - shuffleManager.handleEvents(inputEvents); - } - - @Override - public List close() throws Exception { - this.shuffleManager.shutdown(); - return null; - } - - @Override - public void setNumPhysicalInputs(int numInputs) { - this.numInputs = numInputs; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java deleted file mode 100644 index 26a01c8..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tez.engine.lib.output; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; - -import org.apache.tez.common.TezUtils; -import org.apache.tez.engine.api.Event; -import org.apache.tez.engine.api.KVWriter; -import org.apache.tez.engine.api.LogicalOutput; -import org.apache.tez.engine.api.Output; -import org.apache.tez.engine.api.TezOutputContext; -import org.apache.tez.engine.api.Writer; -import org.apache.tez.engine.common.sort.impl.dflt.InMemoryShuffleSorter; - -/** - * {@link InMemorySortedOutput} is an {@link Output} which sorts key/value pairs - * written to it and persists it to a file. - */ -public class InMemorySortedOutput implements LogicalOutput { - - protected InMemoryShuffleSorter sorter; - protected int numTasks; - protected TezOutputContext outputContext; - - - @Override - public List initialize(TezOutputContext outputContext) - throws IOException { - this.outputContext = outputContext; - this.sorter = new InMemoryShuffleSorter(); - sorter.initialize(outputContext, TezUtils.createConfFromUserPayload(outputContext.getUserPayload()), numTasks); - return Collections.emptyList(); - } - - @Override - public Writer getWriter() throws IOException { - return new KVWriter() { - - @Override - public void write(Object key, Object value) throws IOException { - sorter.write(key, value); - } - }; - } - - @Override - public void handleEvents(List outputEvents) { - // No events expected. - } - - @Override - public void setNumPhysicalOutputs(int numOutputs) { - this.numTasks = numOutputs; - } - - @Override - public List close() throws IOException { - sorter.flush(); - sorter.close(); - // TODO NEWTEZ Event generation - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java deleted file mode 100644 index 7fd26d7..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java +++ /dev/null @@ -1,63 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tez.engine.lib.output; - -import java.io.IOException; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tez.engine.api.Event; -import org.apache.tez.engine.common.task.local.output.TezTaskOutput; - -public class LocalOnFileSorterOutput extends OnFileSortedOutput { - - private static final Log LOG = LogFactory.getLog(LocalOnFileSorterOutput.class); - - - - @Override - public List close() throws IOException { - LOG.debug("Closing LocalOnFileSorterOutput"); - super.close(); - - TezTaskOutput mapOutputFile = sorter.getMapOutput(); - FileSystem localFs = FileSystem.getLocal(conf); - - Path src = mapOutputFile.getOutputFile(); - Path dst = - mapOutputFile.getInputFileForWrite( - outputContext.getTaskIndex(), - localFs.getFileStatus(src).getLen()); - - LOG.info("Renaming src = " + src + ", dst = " + dst); - if (LOG.isDebugEnabled()) { - LOG.debug("Renaming src = " + src + ", dst = " + dst); - } - localFs.rename(src, dst); - return null; - } - - @Override - protected List generateDataMovementEventsOnClose() throws IOException { - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java deleted file mode 100644 index 9c9eba0..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tez.engine.lib.output; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.tez.common.TezJobConfig; -import org.apache.tez.common.TezUtils; -import org.apache.tez.engine.api.Event; -import org.apache.tez.engine.api.KVWriter; -import org.apache.tez.engine.api.LogicalOutput; -import org.apache.tez.engine.api.TezOutputContext; -import org.apache.tez.engine.api.events.DataMovementEvent; -import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto; -import org.apache.tez.engine.common.sort.impl.ExternalSorter; -import org.apache.tez.engine.common.sort.impl.dflt.DefaultSorter; -import org.apache.tez.engine.shuffle.common.ShuffleUtils; - -import com.google.common.collect.Lists; - -/** - * OnFileSortedOutput is an {@link LogicalOutput} which sorts key/value pairs - * written to it and persists it to a file. - */ -public class OnFileSortedOutput implements LogicalOutput { - - protected ExternalSorter sorter; - protected Configuration conf; - protected int numOutputs; - protected TezOutputContext outputContext; - private long startTime; - private long endTime; - - - @Override - public List initialize(TezOutputContext outputContext) - throws IOException { - this.startTime = System.nanoTime(); - this.outputContext = outputContext; - sorter = new DefaultSorter(); - this.conf = TezUtils.createConfFromUserPayload(outputContext.getUserPayload()); - // Initializing this parametr in this conf since it is used in multiple - // places (wherever LocalDirAllocator is used) - TezTaskOutputFiles, - // TezMerger, etc. - this.conf.setStrings(TezJobConfig.LOCAL_DIRS, outputContext.getWorkDirs()); - sorter.initialize(outputContext, conf, numOutputs); - return Collections.emptyList(); - } - - @Override - public KVWriter getWriter() throws IOException { - return new KVWriter() { - @Override - public void write(Object key, Object value) throws IOException { - sorter.write(key, value); - } - }; - } - - @Override - public void handleEvents(List outputEvents) { - // Not expecting any events. - } - - @Override - public void setNumPhysicalOutputs(int numOutputs) { - this.numOutputs = numOutputs; - } - - @Override - public List close() throws IOException { - sorter.flush(); - sorter.close(); - this.endTime = System.nanoTime(); - - return generateDataMovementEventsOnClose(); - } - - protected List generateDataMovementEventsOnClose() throws IOException { - String host = System.getenv(ApplicationConstants.Environment.NM_HOST - .toString()); - ByteBuffer shuffleMetadata = outputContext - .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); - int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata); - - DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto - .newBuilder(); - payloadBuilder.setHost(host); - payloadBuilder.setPort(shufflePort); - payloadBuilder.setPathComponent(outputContext.getUniqueIdentifier()); - payloadBuilder.setRunDuration((int) ((endTime - startTime) / 1000)); - DataMovementEventPayloadProto payloadProto = payloadBuilder.build(); - byte[] payloadBytes = payloadProto.toByteArray(); - - List events = Lists.newArrayListWithCapacity(numOutputs); - - for (int i = 0; i < numOutputs; i++) { - DataMovementEvent event = new DataMovementEvent(i, payloadBytes); - events.add(event); - } - return events; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileUnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileUnorderedKVOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileUnorderedKVOutput.java deleted file mode 100644 index 3ff603f..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileUnorderedKVOutput.java +++ /dev/null @@ -1,98 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tez.engine.lib.output; - -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.List; - -import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.tez.engine.api.Event; -import org.apache.tez.engine.api.KVWriter; -import org.apache.tez.engine.api.LogicalOutput; -import org.apache.tez.engine.api.TezOutputContext; -import org.apache.tez.engine.api.events.DataMovementEvent; -import org.apache.tez.engine.broadcast.output.FileBasedKVWriter; -import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto; -import org.apache.tez.engine.shuffle.common.ShuffleUtils; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -public class OnFileUnorderedKVOutput implements LogicalOutput { - - private TezOutputContext outputContext; - private FileBasedKVWriter kvWriter; - - public OnFileUnorderedKVOutput() { - } - - @Override - public List initialize(TezOutputContext outputContext) - throws Exception { - this.outputContext = outputContext; - this.kvWriter = new FileBasedKVWriter(outputContext); - return Collections.emptyList(); - } - - @Override - public KVWriter getWriter() throws Exception { - return kvWriter; - } - - @Override - public void handleEvents(List outputEvents) { - throw new TezUncheckedException("Not expecting any events"); - } - - @Override - public List close() throws Exception { - boolean outputGenerated = this.kvWriter.close(); - DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto - .newBuilder(); - - String host = System.getenv(ApplicationConstants.Environment.NM_HOST - .toString()); - ByteBuffer shuffleMetadata = outputContext - .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); - int shufflePort = ShuffleUtils - .deserializeShuffleProviderMetaData(shuffleMetadata); - payloadBuilder.setOutputGenerated(outputGenerated); - if (outputGenerated) { - payloadBuilder.setHost(host); - payloadBuilder.setPort(shufflePort); - payloadBuilder.setPathComponent(outputContext.getUniqueIdentifier()); - } - DataMovementEventPayloadProto payloadProto = payloadBuilder.build(); - - DataMovementEvent dmEvent = new DataMovementEvent(0, - payloadProto.toByteArray()); - List events = Lists.newArrayListWithCapacity(1); - events.add(dmEvent); - return events; - } - - @Override - public void setNumPhysicalOutputs(int numOutputs) { - Preconditions.checkArgument(numOutputs == 1, - "Number of outputs can only be 1 for " + this.getClass().getName()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java deleted file mode 100644 index 29063f9..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java +++ /dev/null @@ -1,475 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.engine.newruntime; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.StringUtils; -import org.apache.tez.dag.api.ProcessorDescriptor; -import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.engine.api.Event; -import org.apache.tez.engine.api.Input; -import org.apache.tez.engine.api.LogicalIOProcessor; -import org.apache.tez.engine.api.LogicalInput; -import org.apache.tez.engine.api.LogicalOutput; -import org.apache.tez.engine.api.Output; -import org.apache.tez.engine.api.Processor; -import org.apache.tez.engine.api.TezInputContext; -import org.apache.tez.engine.api.TezOutputContext; -import org.apache.tez.engine.api.TezProcessorContext; -import org.apache.tez.engine.api.impl.EventMetaData; -import org.apache.tez.engine.api.impl.InputSpec; -import org.apache.tez.engine.api.impl.OutputSpec; -import org.apache.tez.engine.api.impl.TaskSpec; -import org.apache.tez.engine.api.impl.TezEvent; -import org.apache.tez.engine.api.impl.TezInputContextImpl; -import org.apache.tez.engine.api.impl.TezOutputContextImpl; -import org.apache.tez.engine.api.impl.TezProcessorContextImpl; -import org.apache.tez.engine.api.impl.TezUmbilical; -import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType; -import org.apache.tez.engine.common.security.JobTokenIdentifier; -import org.apache.tez.engine.shuffle.common.ShuffleUtils; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -@Private -public class LogicalIOProcessorRuntimeTask extends RuntimeTask { - - private static final Log LOG = LogFactory - .getLog(LogicalIOProcessorRuntimeTask.class); - - private final List inputSpecs; - private final List inputs; - - private final List outputSpecs; - private final List outputs; - - private List inputContexts; - private List outputContexts; - private TezProcessorContext processorContext; - - private final ProcessorDescriptor processorDescriptor; - private final LogicalIOProcessor processor; - - private final Map serviceConsumerMetadata; - - private Map inputMap; - private Map outputMap; - - private LinkedBlockingQueue eventsToBeProcessed; - private Thread eventRouterThread = null; - - private final int appAttemptNumber; - - public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber, - Configuration tezConf, TezUmbilical tezUmbilical, - Token jobToken) throws IOException { - // TODO Remove jobToken from here post TEZ-421 - super(taskSpec, tezConf, tezUmbilical); - LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: " - + taskSpec); - this.inputContexts = new ArrayList(taskSpec.getInputs().size()); - this.outputContexts = new ArrayList(taskSpec.getOutputs().size()); - this.inputSpecs = taskSpec.getInputs(); - this.inputs = createInputs(inputSpecs); - this.outputSpecs = taskSpec.getOutputs(); - this.outputs = createOutputs(outputSpecs); - this.processorDescriptor = taskSpec.getProcessorDescriptor(); - this.processor = createProcessor(processorDescriptor); - this.serviceConsumerMetadata = new HashMap(); - this.serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, - ShuffleUtils.convertJobTokenToBytes(jobToken)); - this.eventsToBeProcessed = new LinkedBlockingQueue(); - this.state = State.NEW; - this.appAttemptNumber = appAttemptNumber; - } - - public void initialize() throws Exception { - LOG.info("Initializing LogicalProcessorIORuntimeTask"); - Preconditions.checkState(this.state == State.NEW, "Already initialized"); - this.state = State.INITED; - inputMap = new LinkedHashMap(inputs.size()); - outputMap = new LinkedHashMap(outputs.size()); - - // TODO Maybe close initialized inputs / outputs in case of failure to - // initialize. - // Initialize all inputs. TODO: Multi-threaded at some point. - for (int i = 0; i < inputs.size(); i++) { - String srcVertexName = inputSpecs.get(i).getSourceVertexName(); - initializeInput(inputs.get(i), - inputSpecs.get(i)); - inputMap.put(srcVertexName, inputs.get(i)); - } - - // Initialize all outputs. TODO: Multi-threaded at some point. - for (int i = 0; i < outputs.size(); i++) { - String destVertexName = outputSpecs.get(i).getDestinationVertexName(); - initializeOutput(outputs.get(i), outputSpecs.get(i)); - outputMap.put(destVertexName, outputs.get(i)); - } - - // Initialize processor. - initializeLogicalIOProcessor(); - startRouterThread(); - } - - public void run() throws Exception { - synchronized (this.state) { - Preconditions.checkState(this.state == State.INITED, - "Can only run while in INITED state. Current: " + this.state); - this.state = State.RUNNING; - } - LogicalIOProcessor lioProcessor = (LogicalIOProcessor) processor; - lioProcessor.run(inputMap, outputMap); - } - - public void close() throws Exception { - try { - Preconditions.checkState(this.state == State.RUNNING, - "Can only run while in RUNNING state. Current: " + this.state); - this.state = State.CLOSED; - - // Close the Inputs. - for (int i = 0; i < inputs.size(); i++) { - String srcVertexName = inputSpecs.get(i).getSourceVertexName(); - List closeInputEvents = inputs.get(i).close(); - sendTaskGeneratedEvents(closeInputEvents, - EventProducerConsumerType.INPUT, taskSpec.getVertexName(), - srcVertexName, taskSpec.getTaskAttemptID()); - } - - // Close the Processor. - processor.close(); - - // Close the Outputs. - for (int i = 0; i < outputs.size(); i++) { - String destVertexName = outputSpecs.get(i).getDestinationVertexName(); - List closeOutputEvents = outputs.get(i).close(); - sendTaskGeneratedEvents(closeOutputEvents, - EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), - destVertexName, taskSpec.getTaskAttemptID()); - } - } finally { - setTaskDone(); - if (eventRouterThread != null) { - eventRouterThread.interrupt(); - } - } - } - - private void initializeInput(Input input, InputSpec inputSpec) - throws Exception { - TezInputContext tezInputContext = createInputContext(inputSpec); - inputContexts.add(tezInputContext); - if (input instanceof LogicalInput) { - ((LogicalInput) input).setNumPhysicalInputs(inputSpec - .getPhysicalEdgeCount()); - } - LOG.info("Initializing Input using InputSpec: " + inputSpec); - List events = input.initialize(tezInputContext); - sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT, - tezInputContext.getTaskVertexName(), - tezInputContext.getSourceVertexName(), taskSpec.getTaskAttemptID()); - } - - private void initializeOutput(Output output, OutputSpec outputSpec) - throws Exception { - TezOutputContext tezOutputContext = createOutputContext(outputSpec); - outputContexts.add(tezOutputContext); - if (output instanceof LogicalOutput) { - ((LogicalOutput) output).setNumPhysicalOutputs(outputSpec - .getPhysicalEdgeCount()); - } - LOG.info("Initializing Output using OutputSpec: " + outputSpec); - List events = output.initialize(tezOutputContext); - sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT, - tezOutputContext.getTaskVertexName(), - tezOutputContext.getDestinationVertexName(), - taskSpec.getTaskAttemptID()); - } - - private void initializeLogicalIOProcessor() throws Exception { - LOG.info("Initializing processor" - + ", processorClassName=" + processorDescriptor.getClassName()); - TezProcessorContext processorContext = createProcessorContext(); - this.processorContext = processorContext; - processor.initialize(processorContext); - } - - private TezInputContext createInputContext(InputSpec inputSpec) { - TezInputContext inputContext = new TezInputContextImpl(tezConf, - appAttemptNumber, tezUmbilical, taskSpec.getVertexName(), - inputSpec.getSourceVertexName(), taskSpec.getTaskAttemptID(), - tezCounters, - inputSpec.getInputDescriptor().getUserPayload() == null ? taskSpec - .getProcessorDescriptor().getUserPayload() : inputSpec - .getInputDescriptor().getUserPayload(), this, - serviceConsumerMetadata); - return inputContext; - } - - private TezOutputContext createOutputContext(OutputSpec outputSpec) { - TezOutputContext outputContext = new TezOutputContextImpl(tezConf, - appAttemptNumber, tezUmbilical, taskSpec.getVertexName(), - outputSpec.getDestinationVertexName(), taskSpec.getTaskAttemptID(), - tezCounters, - outputSpec.getOutputDescriptor().getUserPayload() == null ? taskSpec - .getProcessorDescriptor().getUserPayload() : outputSpec - .getOutputDescriptor().getUserPayload(), this, - serviceConsumerMetadata); - return outputContext; - } - - private TezProcessorContext createProcessorContext() { - TezProcessorContext processorContext = new TezProcessorContextImpl(tezConf, - appAttemptNumber, tezUmbilical, taskSpec.getVertexName(), taskSpec.getTaskAttemptID(), - tezCounters, processorDescriptor.getUserPayload(), this, - serviceConsumerMetadata); - return processorContext; - } - - private List createInputs(List inputSpecs) { - List inputs = new ArrayList(inputSpecs.size()); - for (InputSpec inputSpec : inputSpecs) { - LOG.info("Creating Input from InputSpec: " - + inputSpec); - Input input = RuntimeUtils.createClazzInstance(inputSpec - .getInputDescriptor().getClassName()); - - if (input instanceof LogicalInput) { - inputs.add((LogicalInput) input); - } else { - throw new TezUncheckedException(input.getClass().getName() - + " is not a sub-type of LogicalInput." - + " Only LogicalInput sub-types supported by LogicalIOProcessor."); - } - } - return inputs; - } - - private List createOutputs(List outputSpecs) { - List outputs = new ArrayList( - outputSpecs.size()); - for (OutputSpec outputSpec : outputSpecs) { - LOG.info("Creating Output from OutputSpec" - + outputSpec); - Output output = RuntimeUtils.createClazzInstance(outputSpec - .getOutputDescriptor().getClassName()); - if (output instanceof LogicalOutput) { - outputs.add((LogicalOutput) output); - } else { - throw new TezUncheckedException(output.getClass().getName() - + " is not a sub-type of LogicalOutput." - + " Only LogicalOutput sub-types supported by LogicalIOProcessor."); - } - } - return outputs; - } - - private LogicalIOProcessor createProcessor( - ProcessorDescriptor processorDescriptor) { - Processor processor = RuntimeUtils.createClazzInstance(processorDescriptor - .getClassName()); - if (!(processor instanceof LogicalIOProcessor)) { - throw new TezUncheckedException(processor.getClass().getName() - + " is not a sub-type of LogicalIOProcessor." - + " Only LogicalOutput sub-types supported by LogicalIOProcessor."); - } - return (LogicalIOProcessor) processor; - } - - private void sendTaskGeneratedEvents(List events, - EventProducerConsumerType generator, String taskVertexName, - String edgeVertexName, TezTaskAttemptID taskAttemptID) { - if (events == null || events.isEmpty()) { - return; - } - EventMetaData eventMetaData = new EventMetaData(generator, - taskVertexName, edgeVertexName, taskAttemptID); - List tezEvents = new ArrayList(events.size()); - for (Event e : events) { - TezEvent te = new TezEvent(e, eventMetaData); - tezEvents.add(te); - } - if (LOG.isDebugEnabled()) { - for (TezEvent e : tezEvents) { - LOG.debug("Generated event info" - + ", eventMetaData=" + eventMetaData.toString() - + ", eventType=" + e.getEventType()); - } - } - tezUmbilical.addEvents(tezEvents); - } - - private boolean handleEvent(TezEvent e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Handling TezEvent in task" - + ", taskAttemptId=" + taskSpec.getTaskAttemptID() - + ", eventType=" + e.getEventType() - + ", eventSourceInfo=" + e.getSourceInfo() - + ", eventDestinationInfo=" + e.getDestinationInfo()); - } - try { - switch (e.getDestinationInfo().getEventGenerator()) { - case INPUT: - LogicalInput input = inputMap.get( - e.getDestinationInfo().getEdgeVertexName()); - if (input != null) { - input.handleEvents(Collections.singletonList(e.getEvent())); - } else { - throw new TezUncheckedException("Unhandled event for invalid target: " - + e); - } - break; - case OUTPUT: - LogicalOutput output = outputMap.get( - e.getDestinationInfo().getEdgeVertexName()); - if (output != null) { - output.handleEvents(Collections.singletonList(e.getEvent())); - } else { - throw new TezUncheckedException("Unhandled event for invalid target: " - + e); - } - break; - case PROCESSOR: - processor.handleEvents(Collections.singletonList(e.getEvent())); - break; - case SYSTEM: - LOG.warn("Trying to send a System event in a Task: " + e); - break; - } - } catch (Throwable t) { - LOG.warn("Failed to handle event", t); - setFatalError(t, "Failed to handle event"); - EventMetaData sourceInfo = new EventMetaData( - e.getDestinationInfo().getEventGenerator(), - taskSpec.getVertexName(), e.getDestinationInfo().getEdgeVertexName(), - getTaskAttemptID()); - tezUmbilical.signalFatalError(getTaskAttemptID(), - StringUtils.stringifyException(t), sourceInfo); - return false; - } - return true; - } - - @Override - public synchronized void handleEvents(Collection events) { - if (events == null || events.isEmpty()) { - return; - } - eventCounter.addAndGet(events.size()); - if (LOG.isDebugEnabled()) { - LOG.debug("Received events to be processed by task" - + ", taskAttemptId=" + taskSpec.getTaskAttemptID() - + ", eventCount=" + events.size() - + ", newEventCounter=" + eventCounter.get()); - } - eventsToBeProcessed.addAll(events); - } - - private void startRouterThread() { - eventRouterThread = new Thread(new Runnable() { - public void run() { - while (!isTaskDone() && !Thread.currentThread().isInterrupted()) { - try { - TezEvent e = eventsToBeProcessed.take(); - if (e == null) { - continue; - } - // TODO TODONEWTEZ - if (!handleEvent(e)) { - LOG.warn("Stopping Event Router thread as failed to handle" - + " event: " + e); - return; - } - } catch (InterruptedException e) { - if (!isTaskDone()) { - LOG.warn("Event Router thread interrupted. Returning."); - } - return; - } - } - } - }); - - eventRouterThread.setName("TezTaskEventRouter[" - + taskSpec.getTaskAttemptID().toString() + "]"); - eventRouterThread.start(); - } - - public synchronized void cleanup() { - setTaskDone(); - if (eventRouterThread != null) { - eventRouterThread.interrupt(); - } - } - - @Private - @VisibleForTesting - public List getInputContexts() { - return this.inputContexts; - } - - @Private - @VisibleForTesting - public List getOutputContexts() { - return this.outputContexts; - } - - @Private - @VisibleForTesting - public TezProcessorContext getProcessorContext() { - return this.processorContext; - } - - @Private - @VisibleForTesting - public Map getInputs() { - return this.inputMap; - } - - @Private - @VisibleForTesting - public Map getOutputs() { - return this.outputMap; - } - - @Private - @VisibleForTesting - public LogicalIOProcessor getProcessor() { - return this.processor; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java deleted file mode 100644 index 22cbc7c..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.engine.newruntime; - -import java.util.Collection; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.hadoop.conf.Configuration; -import org.apache.tez.common.counters.TezCounters; -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.engine.api.impl.TaskSpec; -import org.apache.tez.engine.api.impl.TezEvent; -import org.apache.tez.engine.api.impl.TezUmbilical; - -public abstract class RuntimeTask { - - protected AtomicBoolean hasFatalError = new AtomicBoolean(false); - protected Throwable fatalError = null; - protected String fatalErrorMessage = null; - protected float progress; - protected final TezCounters tezCounters; - protected final TaskSpec taskSpec; - protected final Configuration tezConf; - protected final TezUmbilical tezUmbilical; - protected final AtomicInteger eventCounter; - private final AtomicBoolean taskDone; - - protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf, - TezUmbilical tezUmbilical) { - this.taskSpec = taskSpec; - this.tezConf = tezConf; - this.tezUmbilical = tezUmbilical; - this.tezCounters = new TezCounters(); - this.eventCounter = new AtomicInteger(0); - this.progress = 0.0f; - this.taskDone = new AtomicBoolean(false); - } - - protected enum State { - NEW, INITED, RUNNING, CLOSED; - } - - protected State state; - - public String getVertexName() { - return taskSpec.getVertexName(); - } - - public void setFatalError(Throwable t, String message) { - hasFatalError.set(true); - this.fatalError = t; - this.fatalErrorMessage = message; - } - - public boolean hadFatalError() { - return hasFatalError.get(); - } - - public synchronized void setProgress(float progress) { - this.progress = progress; - } - - public synchronized float getProgress() { - return this.progress; - } - - public TezCounters getCounters() { - return this.tezCounters; - } - - public TezTaskAttemptID getTaskAttemptID() { - return taskSpec.getTaskAttemptID(); - } - - public abstract void handleEvents(Collection events); - - public int getEventCounter() { - return eventCounter.get(); - } - - public boolean isTaskDone() { - return taskDone.get(); - } - - protected void setTaskDone() { - taskDone.set(true); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeUtils.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeUtils.java deleted file mode 100644 index 20a029e..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeUtils.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.engine.newruntime; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.tez.dag.api.TezUncheckedException; - -public class RuntimeUtils { - - private static final Map> CLAZZ_CACHE = new ConcurrentHashMap>(); - - private static Class getClazz(String className) { - Class clazz = CLAZZ_CACHE.get(className); - if (clazz == null) { - try { - clazz = Class.forName(className); - } catch (ClassNotFoundException e) { - throw new TezUncheckedException("Unable to load class: " + className, e); - } - } - return clazz; - } - - private static T getNewInstance(Class clazz) { - T instance; - try { - instance = clazz.newInstance(); - } catch (InstantiationException e) { - throw new TezUncheckedException( - "Unable to instantiate class with 0 arguments: " + clazz.getName(), e); - } catch (IllegalAccessException e) { - throw new TezUncheckedException( - "Unable to instantiate class with 0 arguments: " + clazz.getName(), e); - } - return instance; - } - - public static T createClazzInstance(String className) { - Class clazz = getClazz(className); - @SuppressWarnings("unchecked") - T instance = (T) getNewInstance(clazz); - return instance; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/DiskFetchedInput.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/DiskFetchedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/DiskFetchedInput.java deleted file mode 100644 index 531e460..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/DiskFetchedInput.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.engine.shuffle.common; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalDirAllocator; -import org.apache.hadoop.fs.Path; -import org.apache.tez.engine.common.InputAttemptIdentifier; -import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles; - -import com.google.common.base.Preconditions; - -public class DiskFetchedInput extends FetchedInput { - - private static final Log LOG = LogFactory.getLog(DiskFetchedInput.class); - - private final FileSystem localFS; - private final Path tmpOutputPath; - private final Path outputPath; - - public DiskFetchedInput(long size, - InputAttemptIdentifier inputAttemptIdentifier, - FetchedInputCallback callbackHandler, Configuration conf, - LocalDirAllocator localDirAllocator, TezTaskOutputFiles filenameAllocator) - throws IOException { - super(Type.DISK, size, inputAttemptIdentifier, callbackHandler); - - this.localFS = FileSystem.getLocal(conf); - this.outputPath = filenameAllocator.getInputFileForWrite( - this.inputAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(), size); - this.tmpOutputPath = outputPath.suffix(String.valueOf(id)); - } - - @Override - public OutputStream getOutputStream() throws IOException { - return localFS.create(tmpOutputPath); - } - - @Override - public InputStream getInputStream() throws IOException { - return localFS.open(outputPath); - } - - @Override - public void commit() throws IOException { - if (state == State.PENDING) { - state = State.COMMITTED; - localFS.rename(tmpOutputPath, outputPath); - notifyFetchComplete(); - } - } - - @Override - public void abort() throws IOException { - if (state == State.PENDING) { - state = State.ABORTED; - // TODO NEWTEZ Maybe defer this to container cleanup - localFS.delete(tmpOutputPath, false); - notifyFetchFailure(); - } - } - - @Override - public void free() { - Preconditions.checkState( - state == State.COMMITTED || state == State.ABORTED, - "FetchedInput can only be freed after it is committed or aborted"); - if (state == State.COMMITTED) { - state = State.FREED; - try { - // TODO NEWTEZ Maybe defer this to container cleanup - localFS.delete(outputPath, false); - } catch (IOException e) { - // Ignoring the exception, will eventually be cleaned by container - // cleanup. - LOG.warn("Failed to remvoe file : " + outputPath.toString()); - } - notifyFreedResource(); - } - } - - @Override - public String toString() { - return "DiskFetchedInput [outputPath=" + outputPath - + ", inputAttemptIdentifier=" + inputAttemptIdentifier + ", size=" - + size + ", type=" + type + ", id=" + id + ", state=" + state + "]"; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchResult.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchResult.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchResult.java deleted file mode 100644 index fb0b324..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchResult.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.engine.shuffle.common; -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import org.apache.tez.engine.common.InputAttemptIdentifier; - -public class FetchResult { - - private final String host; - private final int port; - private final int partition; - private final Iterable pendingInputs; - - public FetchResult(String host, int port, int partition, - Iterable pendingInputs) { - this.host = host; - this.port = port; - this.partition = partition; - this.pendingInputs = pendingInputs; - } - - public String getHost() { - return host; - } - - public int getPort() { - return port; - } - - public int getPartition() { - return partition; - } - - public Iterable getPendingInputs() { - return pendingInputs; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInput.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInput.java deleted file mode 100644 index f5339d3..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInput.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.engine.shuffle.common; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.tez.engine.common.InputAttemptIdentifier; - -@Private -public abstract class FetchedInput { - - public static enum Type { - WAIT, // TODO NEWTEZ Implement this, only if required. - MEMORY, - DISK, - } - - protected static enum State { - PENDING, COMMITTED, ABORTED, FREED - } - - private static AtomicInteger ID_GEN = new AtomicInteger(0); - - protected InputAttemptIdentifier inputAttemptIdentifier; - protected final long size; - protected final Type type; - protected final FetchedInputCallback callback; - protected final int id; - protected State state; - - public FetchedInput(Type type, long size, - InputAttemptIdentifier inputAttemptIdentifier, - FetchedInputCallback callbackHandler) { - this.type = type; - this.size = size; - this.inputAttemptIdentifier = inputAttemptIdentifier; - this.callback = callbackHandler; - this.id = ID_GEN.getAndIncrement(); - this.state = State.PENDING; - } - - public Type getType() { - return this.type; - } - - public long getSize() { - return this.size; - } - - public InputAttemptIdentifier getInputAttemptIdentifier() { - return this.inputAttemptIdentifier; - } - - /** - * Inform the Allocator about a committed resource. - * This should be called by commit - */ - public void notifyFetchComplete() { - this.callback.fetchComplete(this); - } - - /** - * Inform the Allocator about a failed resource. - * This should be called by abort - */ - public void notifyFetchFailure() { - this.callback.fetchFailed(this); - } - - /** - * Inform the Allocator about a completed resource being released. - * This should be called by free - */ - public void notifyFreedResource() { - this.callback.freeResources(this); - } - - /** - * Returns the output stream to be used to write fetched data. Users are - * expected to close the OutputStream when they're done - */ - public abstract OutputStream getOutputStream() throws IOException; - - /** - * Return an input stream to be used to read the previously fetched data. - * Users are expected to close the InputStream when they're done - */ - public abstract InputStream getInputStream() throws IOException; - - /** - * Commit the output. Should be idempotent - */ - public abstract void commit() throws IOException; - - /** - * Abort the output. Should be idempotent - */ - public abstract void abort() throws IOException; - - /** - * Called when this input has been consumed, so that resources can be - * reclaimed. - */ - public abstract void free(); - - @Override - public int hashCode() { - return id; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - FetchedInput other = (FetchedInput) obj; - if (id != other.id) - return false; - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputAllocator.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputAllocator.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputAllocator.java deleted file mode 100644 index 7e573f0..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputAllocator.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.engine.shuffle.common; - -import java.io.IOException; - -import org.apache.tez.engine.common.InputAttemptIdentifier; - -public interface FetchedInputAllocator { - - public FetchedInput allocate(long size, - InputAttemptIdentifier inputAttemptIdentifier) throws IOException; - - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputCallback.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputCallback.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputCallback.java deleted file mode 100644 index 2d2d73b..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputCallback.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.engine.shuffle.common; - -public interface FetchedInputCallback { - - public void fetchComplete(FetchedInput fetchedInput); - - public void fetchFailed(FetchedInput fetchedInput); - - public void freeResources(FetchedInput fetchedInput); - -}