Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1EBED102C4 for ; Wed, 11 Sep 2013 04:50:12 +0000 (UTC) Received: (qmail 66747 invoked by uid 500); 11 Sep 2013 04:50:12 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 66704 invoked by uid 500); 11 Sep 2013 04:50:10 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 66697 invoked by uid 99); 11 Sep 2013 04:50:09 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Sep 2013 04:50:09 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 11 Sep 2013 04:50:07 +0000 Received: (qmail 65271 invoked by uid 99); 11 Sep 2013 04:49:46 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Sep 2013 04:49:46 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 4E4838BD52F; Wed, 11 Sep 2013 04:49:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.incubator.apache.org Date: Wed, 11 Sep 2013 04:49:46 -0000 Message-Id: <71254d4297594ef58dbfc3813b14b4f6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/4] TEZ-417. Change Shuffle Input/Output to work with the new APIs (part of TEZ-398). (sseth) X-Virus-Checked: Checked by ClamAV on apache.org Updated Branches: refs/heads/TEZ-398 e5919fa75 -> 1cf7f197d http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java index 79787b7..29a4b02 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java @@ -18,52 +18,103 @@ package org.apache.tez.engine.lib.output; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; import org.apache.hadoop.conf.Configuration; -import org.apache.tez.common.RunningTaskContext; -import org.apache.tez.common.TezEngineTaskContext; -import org.apache.tez.engine.api.Master; -import org.apache.tez.engine.api.Output; -import org.apache.tez.engine.common.sort.SortingOutput; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.tez.common.TezJobConfig; +import org.apache.tez.common.TezUtils; +import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto; import org.apache.tez.engine.common.sort.impl.ExternalSorter; import org.apache.tez.engine.common.sort.impl.dflt.DefaultSorter; -import org.apache.tez.engine.records.OutputContext; +import org.apache.tez.engine.newapi.Event; +import org.apache.tez.engine.newapi.KVWriter; +import org.apache.tez.engine.newapi.LogicalOutput; +import org.apache.tez.engine.newapi.TezOutputContext; +import org.apache.tez.engine.newapi.Writer; +import org.apache.tez.engine.newapi.events.DataMovementEvent; +import org.apache.tez.engine.shuffle.common.ShuffleUtils; + +import com.google.common.collect.Lists; /** - * {@link OnFileSortedOutput} is an {@link Output} which sorts key/value pairs + * OnFileSortedOutput is an {@link LogicalOutput} which sorts key/value pairs * written to it and persists it to a file. */ -public class OnFileSortedOutput implements SortingOutput { +public class OnFileSortedOutput implements LogicalOutput { protected ExternalSorter sorter; + protected Configuration conf; + protected int numOutputs; + protected TezOutputContext outputContext; + private long startTime; + private long endTime; - public OnFileSortedOutput(TezEngineTaskContext task) throws IOException { - sorter = new DefaultSorter(task); - } - public void initialize(Configuration conf, Master master) - throws IOException, InterruptedException { - sorter.initialize(conf, master); + @Override + public List initialize(TezOutputContext outputContext) + throws IOException { + this.startTime = System.nanoTime(); + this.outputContext = outputContext; + sorter = new DefaultSorter(); + this.conf = TezUtils.createConfFromUserPayload(outputContext.getUserPayload()); + // Initializing this parametr in this conf since it is used in multiple + // places (wherever LocalDirAllocator is used) - TezTaskOutputFiles, + // TezMerger, etc. + this.conf.setStrings(TezJobConfig.LOCAL_DIRS, outputContext.getWorkDirs()); + sorter.initialize(outputContext, conf, numOutputs); + return Collections.emptyList(); } @Override - public void setTask(RunningTaskContext task) { - sorter.setTask(task); - } - - public void write(Object key, Object value) throws IOException, - InterruptedException { - sorter.write(key, value); + public Writer getWriter() throws IOException { + return new KVWriter() { + @Override + public void write(Object key, Object value) throws IOException { + sorter.write(key, value); + } + }; } - public void close() throws IOException, InterruptedException { - sorter.flush(); - sorter.close(); + @Override + public void handleEvents(List outputEvents) { + // Not expecting any events. } @Override - public OutputContext getOutputContext() { - return null; + public void setNumPhysicalOutputs(int numOutputs) { + this.numOutputs = numOutputs; } + @Override + public List close() throws IOException { + sorter.flush(); + sorter.close(); + this.endTime = System.nanoTime(); + + String host = System.getenv(ApplicationConstants.Environment.NM_HOST + .toString()); + ByteBuffer shuffleMetadata = outputContext + .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); + int shufflePort = ShuffleUtils.deserializeShuffleMetaData(shuffleMetadata); + + DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto + .newBuilder(); + payloadBuilder.setHost(host); + payloadBuilder.setPort(shufflePort); + payloadBuilder.setPathComponent(outputContext.getUniqueIdentifier()); + payloadBuilder.setRunDuration((int) ((endTime - startTime) / 1000)); + DataMovementEventPayloadProto payloadProto = payloadBuilder.build(); + + List events = Lists.newArrayListWithCapacity(numOutputs); + + for (int i = 0; i < numOutputs; i++) { + DataMovementEvent event = new DataMovementEvent(i, + payloadProto.toByteArray()); + events.add(event); + } + return events; + } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java index 68e0f47..bd0e933 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java @@ -22,33 +22,57 @@ import java.io.IOException; /** * A key/value(s) pair based {@link Reader}. + * + * Example usage + * + * while (kvReader.moveToNext()) { + * KVRecord kvRecord = getCurrentKV(); + * Object key = kvRecord.getKey(); + * Iterable values = kvRecord.getValues(); + * + * */ public interface KVReader extends Reader { /** - * Check if there is another key/value(s) pair + * Moves to the next key/values(s) pair * - * @return true if another key/value(s) pair exists + * @return true if another key/value(s) pair exists, false if there are no more. * @throws IOException * if an error occurs */ - public boolean hasNext() throws IOException; + public boolean moveToNext() throws IOException; /** - * Gets the next key. - * - * @return the next key, or null if none exists + * Return the current key/value(s) pair. Use moveToNext() to advance. + * @return * @throws IOException - * if an error occurs */ - public Object getNextKey() throws IOException; + public KVRecord getCurrentKV() throws IOException; + + + /** - * Get the next values. - * - * @return an Iterable view of the values for the current key - * @throws IOException - * if an error occurs + * Represents a key and an associated set of values + * */ - public Iterable getNextValues() throws IOException; + public static class KVRecord { + + private Object key; + private Iterable values; + + public KVRecord(Object key, Iterable values) { + this.key = key; + this.values = values; + } + + public Object getKey() { + return this.key; + } + + public Iterable getValues() { + return this.values; + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java index f945b63..ad48912 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java @@ -23,7 +23,7 @@ import java.io.IOException; /** * A key/value(s) pair based {@link Writer} */ -public interface KVWriter { +public interface KVWriter extends Writer { /** * Writes a key/value pair. * http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java index b4558d0..1d76d86 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java @@ -49,6 +49,9 @@ public class TezInputContextImpl extends TezTaskContextImpl this.sourceInfo = new EventMetaData( EventGenerator.INPUT, taskVertexName, sourceVertexName, taskAttemptID); + this.uniqueIdentifier = String.format("%s_%s_%6d_%2d_%s", taskAttemptID + .getTaskID().getVertexID().getDAGId().toString(), taskVertexName, + getTaskIndex(), getAttemptNumber(), sourceVertexName); } @Override @@ -70,5 +73,4 @@ public class TezInputContextImpl extends TezTaskContextImpl public String getSourceVertexName() { return sourceVertexName; } - } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java index ba632db..e5b81d0 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java @@ -49,6 +49,9 @@ public class TezOutputContextImpl extends TezTaskContextImpl this.tezUmbilical = tezUmbilical; this.sourceInfo = new EventMetaData(EventGenerator.OUTPUT, taskVertexName, destinationVertexName, taskAttemptID); + this.uniqueIdentifier = String.format("%s_%s_%6d_%2d_%s", taskAttemptID + .getTaskID().getVertexID().getDAGId().toString(), taskVertexName, + getTaskIndex(), getAttemptNumber(), destinationVertexName); } @Override http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java index 4e0f061..73c4a54 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java @@ -44,6 +44,9 @@ public class TezProcessorContextImpl extends TezTaskContextImpl this.tezUmbilical = tezUmbilical; this.sourceInfo = new EventMetaData(EventGenerator.PROCESSOR, taskVertexName, "", taskAttemptID); + this.uniqueIdentifier = String.format("%s_%s_%6d_%2d", taskAttemptID + .getTaskID().getVertexID().getDAGId().toString(), taskVertexName, + getTaskIndex(), getAttemptNumber()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java index 712eec3..b77bcdd 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java @@ -18,18 +18,27 @@ package org.apache.tez.engine.newapi.impl; +import java.nio.ByteBuffer; +import java.util.Arrays; + import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; +import org.apache.tez.common.TezJobConfig; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.engine.newapi.TezTaskContext; +import org.apache.tez.engine.shuffle.common.ShuffleUtils; public abstract class TezTaskContextImpl implements TezTaskContext { - protected final Configuration conf; + private final Configuration conf; protected final String taskVertexName; - protected final TezTaskAttemptID taskAttemptID; - protected final TezCounters counters; + private final TezTaskAttemptID taskAttemptID; + private final TezCounters counters; + private String[] workDirs; + protected String uniqueIdentifier; @Private public TezTaskContextImpl(Configuration conf, @@ -39,9 +48,18 @@ public abstract class TezTaskContextImpl implements TezTaskContext { this.taskVertexName = taskVertexName; this.taskAttemptID = taskAttemptID; this.counters = counters; + // TODO Maybe change this to be task id specific at some point. For now + // Shuffle code relies on this being a path specified by YARN + this.workDirs = this.conf.getStrings(TezJobConfig.LOCAL_DIRS); } @Override + public ApplicationId getApplicationId() { + return taskAttemptID.getTaskID().getVertexID().getDAGId() + .getApplicationId(); + } + + @Override public int getTaskIndex() { return taskAttemptID.getTaskID().getId(); } @@ -52,8 +70,14 @@ public abstract class TezTaskContextImpl implements TezTaskContext { } @Override + public String getDAGName() { + // TODO NEWTEZ Change to some form of the DAG name, for now using dagId as + // the unique identifier. + return taskAttemptID.getTaskID().getVertexID().getDAGId().toString(); + } + + @Override public String getTaskVertexName() { - // TODO Auto-generated method stub return taskVertexName; } @@ -63,5 +87,30 @@ public abstract class TezTaskContextImpl implements TezTaskContext { return counters; } - // TODO Add a method to get working dir + @Override + public String[] getWorkDirs() { + return Arrays.copyOf(workDirs, workDirs.length); + } + + @Override + public String getUniqueIdentifier() { + return uniqueIdentifier; + } + + @Override + public void fatalError(Throwable exception, String message) { + // TODO NEWTEZ Implement once the TezContext communication is setup. + } + + @Override + public ByteBuffer getServiceConsumerMetaData(String serviceName) { + // TODO NEWTEZ Make sure this data is set by the AM for the Shuffle service name. + return null; + } + + @Override + public ByteBuffer getServiceProviderMetaData(String serviceName) { + return AuxiliaryServiceHelper.getServiceDataFromEnv( + ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, System.getenv()); + } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java new file mode 100644 index 0000000..3a6b2e4 --- /dev/null +++ b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.engine.shuffle.common; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import javax.crypto.SecretKey; + +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.security.token.Token; +import org.apache.tez.engine.common.security.JobTokenIdentifier; +import org.apache.tez.engine.common.security.JobTokenSecretManager; + +public class ShuffleUtils { + + public static String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce.shuffle"; + + public static SecretKey getJobTokenSecretFromTokenBytes(ByteBuffer meta) + throws IOException { + DataInputByteBuffer in = new DataInputByteBuffer(); + in.reset(meta); + Token jt = new Token(); + jt.readFields(in); + SecretKey sk = JobTokenSecretManager.createSecretKey(jt.getPassword()); + return sk; + } + + public static int deserializeShuffleMetaData(ByteBuffer meta) + throws IOException { + DataInputByteBuffer in = new DataInputByteBuffer(); + try { + in.reset(meta); + int port = in.readInt(); + return port; + } finally { + in.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java index b2f1318..9bc430b 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java @@ -80,8 +80,8 @@ import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.engine.common.objectregistry.ObjectLifeCycle; import org.apache.tez.engine.common.objectregistry.ObjectRegistry; import org.apache.tez.engine.common.objectregistry.ObjectRegistryFactory; -import org.apache.tez.engine.lib.input.ShuffledMergedInput; -import org.apache.tez.engine.lib.output.OnFileSortedOutput; +import org.apache.tez.engine.lib.oldinput.OldShuffledMergedInput; +import org.apache.tez.engine.lib.oldoutput.OldOnFileSortedOutput; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; @@ -611,9 +611,9 @@ public class MRRSleepJob extends Configured implements Tool { DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor( - OnFileSortedOutput.class.getName()), + OldOnFileSortedOutput.class.getName()), new InputDescriptor( - ShuffledMergedInput.class.getName())))); + OldShuffledMergedInput.class.getName())))); } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java index 7fae4a3..016fbda 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java @@ -70,8 +70,8 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; -import org.apache.tez.engine.lib.input.ShuffledMergedInput; -import org.apache.tez.engine.lib.output.OnFileSortedOutput; +import org.apache.tez.engine.lib.oldinput.OldShuffledMergedInput; +import org.apache.tez.engine.lib.oldoutput.OldOnFileSortedOutput; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; @@ -312,9 +312,9 @@ public class OrderedWordCount { DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor( - OnFileSortedOutput.class.getName()), + OldOnFileSortedOutput.class.getName()), new InputDescriptor( - ShuffledMergedInput.class.getName())))); + OldShuffledMergedInput.class.getName())))); } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java index 701ca87..12953e4 100644 --- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java +++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java @@ -68,8 +68,8 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.DAGStatus.State; -import org.apache.tez.engine.lib.input.ShuffledMergedInput; -import org.apache.tez.engine.lib.output.OnFileSortedOutput; +import org.apache.tez.engine.lib.oldinput.OldShuffledMergedInput; +import org.apache.tez.engine.lib.oldoutput.OldOnFileSortedOutput; import org.apache.tez.mapreduce.examples.MRRSleepJob; import org.apache.tez.mapreduce.examples.MRRSleepJob.ISleepReducer; import org.apache.tez.mapreduce.examples.MRRSleepJob.MRRSleepJobPartitioner; @@ -396,23 +396,23 @@ public class TestMRRJobsDAGApi { Edge edge1 = new Edge(stage1Vertex, stage2Vertex, new EdgeProperty( DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor( - OnFileSortedOutput.class.getName()), new InputDescriptor( - ShuffledMergedInput.class.getName()))); + OldOnFileSortedOutput.class.getName()), new InputDescriptor( + OldShuffledMergedInput.class.getName()))); Edge edge11 = new Edge(stage11Vertex, stage22Vertex, new EdgeProperty( DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor( - OnFileSortedOutput.class.getName()), new InputDescriptor( - ShuffledMergedInput.class.getName()))); + OldOnFileSortedOutput.class.getName()), new InputDescriptor( + OldShuffledMergedInput.class.getName()))); Edge edge2 = new Edge(stage2Vertex, stage3Vertex, new EdgeProperty( DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor( - OnFileSortedOutput.class.getName()), new InputDescriptor( - ShuffledMergedInput.class.getName()))); + OldOnFileSortedOutput.class.getName()), new InputDescriptor( + OldShuffledMergedInput.class.getName()))); Edge edge3 = new Edge(stage22Vertex, stage3Vertex, new EdgeProperty( DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor( - OnFileSortedOutput.class.getName()), new InputDescriptor( - ShuffledMergedInput.class.getName()))); + OldOnFileSortedOutput.class.getName()), new InputDescriptor( + OldShuffledMergedInput.class.getName()))); dag.addEdge(edge1); dag.addEdge(edge11); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java index 204f517..7df783b 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java @@ -76,8 +76,8 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.engine.api.Task; import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles; import org.apache.tez.engine.common.task.local.output.TezTaskOutput; -import org.apache.tez.engine.lib.input.LocalMergedInput; -import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput; +import org.apache.tez.engine.lib.oldinput.LocalMergedInput; +import org.apache.tez.engine.lib.oldoutput.OldLocalOnFileSorterOutput; import org.apache.tez.engine.newapi.impl.TezEvent; import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest; import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse; @@ -257,7 +257,7 @@ public class LocalJobRunnerTez implements ClientProtocol { Collections.singletonList(new InputSpec("srcVertex", 0, SimpleInput.class.getName())), Collections.singletonList(new OutputSpec("tgtVertex", 0, - LocalOnFileSorterOutput.class.getName()))); + OldLocalOnFileSorterOutput.class.getName()))); TezTaskOutput mapOutput = new TezLocalTaskOutputFiles(); mapOutput.setConf(localConf); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java index 5793104..4fb1876 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java @@ -48,7 +48,7 @@ import org.apache.tez.engine.api.Processor; import org.apache.tez.engine.common.ConfigUtils; import org.apache.tez.engine.common.sort.SortingOutput; import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator; -import org.apache.tez.engine.lib.input.ShuffledMergedInput; +import org.apache.tez.engine.lib.oldinput.OldShuffledMergedInput; import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl; import org.apache.tez.mapreduce.input.SimpleInput; import org.apache.tez.mapreduce.output.SimpleOutput; @@ -108,12 +108,12 @@ implements Processor { if (in instanceof SimpleInput) { ((SimpleInput)in).setTask(this); - } else if (in instanceof ShuffledMergedInput) { - ((ShuffledMergedInput)in).setTask(this); + } else if (in instanceof OldShuffledMergedInput) { + ((OldShuffledMergedInput)in).setTask(this); } if(ins.length > 1) { - if (!(in instanceof ShuffledMergedInput)) { + if (!(in instanceof OldShuffledMergedInput)) { throw new IOException( "Only ShuffledMergedInput can support multiple inputs" + ". inputCount=" + ins.length); @@ -124,15 +124,15 @@ implements Processor { + ins.length + " From contex:" + inputs.size()); } // initialize and merge the remaining - ShuffledMergedInput s0 = ((ShuffledMergedInput)in); + OldShuffledMergedInput s0 = ((OldShuffledMergedInput)in); for(int i=1; i comparator, Class keyClass, Class valClass, @@ -297,7 +297,7 @@ implements Processor { void runNewReducer(JobConf job, final TezTaskUmbilicalProtocol umbilical, final MRTaskReporter reporter, - ShuffledMergedInput input, + OldShuffledMergedInput input, RawComparator comparator, Class keyClass, Class valueClass, http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java index ae62251..3610f9f 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java @@ -38,8 +38,7 @@ import org.apache.tez.engine.api.Task; import org.apache.tez.engine.common.sort.impl.IFile; import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles; import org.apache.tez.engine.common.task.local.output.TezTaskOutput; -import org.apache.tez.engine.lib.output.InMemorySortedOutput; -import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput; +import org.apache.tez.engine.lib.oldoutput.OldLocalOnFileSorterOutput; import org.apache.tez.mapreduce.TestUmbilicalProtocol; import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil; @@ -47,14 +46,9 @@ import org.apache.tez.mapreduce.hadoop.mapreduce.TezNullOutputCommitter; import org.apache.tez.mapreduce.input.SimpleInput; import org.apache.tez.mapreduce.processor.MRTask; import org.apache.tez.mapreduce.processor.MapUtils; -import org.jboss.netty.buffer.BigEndianHeapChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.TruncatedChannelBuffer; -import org.jboss.netty.handler.stream.ChunkedStream; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; @SuppressWarnings("deprecation") @@ -122,7 +116,7 @@ public class TestMapProcessor { Collections.singletonList(new InputSpec("NullVertex", 0, SimpleInput.class.getName())), Collections.singletonList(new OutputSpec("FakeVertex", 1, - LocalOnFileSorterOutput.class.getName()))); + OldLocalOnFileSorterOutput.class.getName()))); MRTask mrTask = (MRTask)t.getProcessor(); Assert.assertEquals(TezNullOutputCommitter.class.getName(), mrTask @@ -151,76 +145,76 @@ public class TestMapProcessor { reader.close(); } - @Test - @Ignore - public void testMapProcessorWithInMemSort() throws Exception { - - String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName(); - - final int partitions = 2; - JobConf jobConf = new JobConf(defaultConf); - jobConf.setNumReduceTasks(partitions); - setUpJobConf(jobConf); - TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles(); - mapOutputs.setConf(jobConf); - - Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf); - Configuration stageConf = MultiStageMRConfigUtil.getConfForVertex(conf, - vertexName); - - JobConf job = new JobConf(stageConf); - - job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir, - "localized-resources").toUri().toString()); - localFs.delete(workDir, true); - Task t = - MapUtils.runMapProcessor( - localFs, workDir, job, 0, new Path(workDir, "map0"), - new TestUmbilicalProtocol(true), vertexName, - Collections.singletonList(new InputSpec("NullVertex", 0, - SimpleInput.class.getName())), - Collections.singletonList(new OutputSpec("FakeVertex", 1, - InMemorySortedOutput.class.getName())) - ); - InMemorySortedOutput[] outputs = (InMemorySortedOutput[])t.getOutputs(); - - verifyInMemSortedStream(outputs[0], 0, 4096); - int i = 0; - for (i = 2; i < 256; i <<= 1) { - verifyInMemSortedStream(outputs[0], 0, i); - } - verifyInMemSortedStream(outputs[0], 1, 4096); - for (i = 2; i < 256; i <<= 1) { - verifyInMemSortedStream(outputs[0], 1, i); - } - - t.close(); - } - - private void verifyInMemSortedStream( - InMemorySortedOutput output, int partition, int chunkSize) - throws Exception { - ChunkedStream cs = - new ChunkedStream( - output.getSorter().getSortedStream(partition), chunkSize); - int actualBytes = 0; - ChannelBuffer b = null; - while ((b = (ChannelBuffer)cs.nextChunk()) != null) { - LOG.info("b = " + b); - actualBytes += - (b instanceof TruncatedChannelBuffer) ? - ((TruncatedChannelBuffer)b).capacity() : - ((BigEndianHeapChannelBuffer)b).readableBytes(); - } - - LOG.info("verifyInMemSortedStream" + - " partition=" + partition + - " chunkSize=" + chunkSize + - " expected=" + - output.getSorter().getShuffleHeader(partition).getCompressedLength() + - " actual=" + actualBytes); - Assert.assertEquals( - output.getSorter().getShuffleHeader(partition).getCompressedLength(), - actualBytes); - } +// @Test +// @Ignore +// public void testMapProcessorWithInMemSort() throws Exception { +// +// String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName(); +// +// final int partitions = 2; +// JobConf jobConf = new JobConf(defaultConf); +// jobConf.setNumReduceTasks(partitions); +// setUpJobConf(jobConf); +// TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles(); +// mapOutputs.setConf(jobConf); +// +// Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf); +// Configuration stageConf = MultiStageMRConfigUtil.getConfForVertex(conf, +// vertexName); +// +// JobConf job = new JobConf(stageConf); +// +// job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir, +// "localized-resources").toUri().toString()); +// localFs.delete(workDir, true); +// Task t = +// MapUtils.runMapProcessor( +// localFs, workDir, job, 0, new Path(workDir, "map0"), +// new TestUmbilicalProtocol(true), vertexName, +// Collections.singletonList(new InputSpec("NullVertex", 0, +// SimpleInput.class.getName())), +// Collections.singletonList(new OutputSpec("FakeVertex", 1, +// OldInMemorySortedOutput.class.getName())) +// ); +// OldInMemorySortedOutput[] outputs = (OldInMemorySortedOutput[])t.getOutputs(); +// +// verifyInMemSortedStream(outputs[0], 0, 4096); +// int i = 0; +// for (i = 2; i < 256; i <<= 1) { +// verifyInMemSortedStream(outputs[0], 0, i); +// } +// verifyInMemSortedStream(outputs[0], 1, 4096); +// for (i = 2; i < 256; i <<= 1) { +// verifyInMemSortedStream(outputs[0], 1, i); +// } +// +// t.close(); +// } +// +// private void verifyInMemSortedStream( +// OldInMemorySortedOutput output, int partition, int chunkSize) +// throws Exception { +// ChunkedStream cs = +// new ChunkedStream( +// output.getSorter().getSortedStream(partition), chunkSize); +// int actualBytes = 0; +// ChannelBuffer b = null; +// while ((b = (ChannelBuffer)cs.nextChunk()) != null) { +// LOG.info("b = " + b); +// actualBytes += +// (b instanceof TruncatedChannelBuffer) ? +// ((TruncatedChannelBuffer)b).capacity() : +// ((BigEndianHeapChannelBuffer)b).readableBytes(); +// } +// +// LOG.info("verifyInMemSortedStream" + +// " partition=" + partition + +// " chunkSize=" + chunkSize + +// " expected=" + +// output.getSorter().getShuffleHeader(partition).getCompressedLength() + +// " actual=" + actualBytes); +// Assert.assertEquals( +// output.getSorter().getShuffleHeader(partition).getCompressedLength(), +// actualBytes); +// } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java index 2428000..2a121a6 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java @@ -40,8 +40,8 @@ import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.engine.api.Task; import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles; import org.apache.tez.engine.common.task.local.output.TezTaskOutput; -import org.apache.tez.engine.lib.input.LocalMergedInput; -import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput; +import org.apache.tez.engine.lib.oldinput.LocalMergedInput; +import org.apache.tez.engine.lib.oldoutput.OldLocalOnFileSorterOutput; import org.apache.tez.engine.runtime.RuntimeUtils; import org.apache.tez.mapreduce.TestUmbilicalProtocol; import org.apache.tez.mapreduce.TezTestUtils; @@ -122,7 +122,7 @@ public class TestReduceProcessor { Collections.singletonList(new InputSpec("NullVertex", 0, SimpleInput.class.getName())), Collections.singletonList(new OutputSpec("FakeVertex", 1, - LocalOnFileSorterOutput.class.getName()))); + OldLocalOnFileSorterOutput.class.getName()))); LOG.info("Starting reduce..."); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java ---------------------------------------------------------------------- diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java index 0fb823f..2d59b18 100644 --- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java +++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java @@ -95,8 +95,8 @@ import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; -import org.apache.tez.engine.lib.input.ShuffledMergedInput; -import org.apache.tez.engine.lib.output.OnFileSortedOutput; +import org.apache.tez.engine.lib.oldinput.OldShuffledMergedInput; +import org.apache.tez.engine.lib.oldoutput.OldOnFileSortedOutput; import org.apache.tez.mapreduce.hadoop.DeprecatedKeys; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; @@ -463,8 +463,8 @@ public class YARNRunner implements ClientProtocol { EdgeProperty edgeProperty = new EdgeProperty( DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, - new OutputDescriptor(OnFileSortedOutput.class.getName()), - new InputDescriptor(ShuffledMergedInput.class.getName())); + new OutputDescriptor(OldOnFileSortedOutput.class.getName()), + new InputDescriptor(OldShuffledMergedInput.class.getName())); Edge edge = null; edge = new Edge(vertices[i - 1], vertices[i], edgeProperty);