tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [1/4] TEZ-417. Change Shuffle Input/Output to work with the new APIs (part of TEZ-398). (sseth)
Date Wed, 11 Sep 2013 04:49:46 GMT
Updated Branches:
  refs/heads/TEZ-398 e5919fa75 -> 1cf7f197d


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
index 79787b7..29a4b02 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
@@ -18,52 +18,103 @@
 package org.apache.tez.engine.lib.output;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.common.sort.SortingOutput;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 import org.apache.tez.engine.common.sort.impl.ExternalSorter;
 import org.apache.tez.engine.common.sort.impl.dflt.DefaultSorter;
-import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.KVWriter;
+import org.apache.tez.engine.newapi.LogicalOutput;
+import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.newapi.Writer;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.shuffle.common.ShuffleUtils;
+
+import com.google.common.collect.Lists;
 
 /**
- * {@link OnFileSortedOutput} is an {@link Output} which sorts key/value pairs 
+ * <code>OnFileSortedOutput</code> is an {@link LogicalOutput} which sorts key/value
pairs 
  * written to it and persists it to a file.
  */
-public class OnFileSortedOutput implements SortingOutput {
+public class OnFileSortedOutput implements LogicalOutput {
   
   protected ExternalSorter sorter;
+  protected Configuration conf;
+  protected int numOutputs;
+  protected TezOutputContext outputContext;
+  private long startTime;
+  private long endTime;
   
-  public OnFileSortedOutput(TezEngineTaskContext task) throws IOException {
-    sorter = new DefaultSorter(task);
-  }
   
-  public void initialize(Configuration conf, Master master) 
-      throws IOException, InterruptedException {
-    sorter.initialize(conf, master);
+  @Override
+  public List<Event> initialize(TezOutputContext outputContext)
+      throws IOException {
+    this.startTime = System.nanoTime();
+    this.outputContext = outputContext;
+    sorter = new DefaultSorter();
+    this.conf = TezUtils.createConfFromUserPayload(outputContext.getUserPayload());
+    // Initializing this parametr in this conf since it is used in multiple
+    // places (wherever LocalDirAllocator is used) - TezTaskOutputFiles,
+    // TezMerger, etc.
+    this.conf.setStrings(TezJobConfig.LOCAL_DIRS, outputContext.getWorkDirs());
+    sorter.initialize(outputContext, conf, numOutputs);
+    return Collections.emptyList();
   }
 
   @Override
-  public void setTask(RunningTaskContext task) {
-    sorter.setTask(task);
-  }
-  
-  public void write(Object key, Object value) throws IOException,
-      InterruptedException {
-    sorter.write(key, value);
+  public Writer getWriter() throws IOException {
+    return new KVWriter() {
+      @Override
+      public void write(Object key, Object value) throws IOException {
+        sorter.write(key, value);
+      }
+    };
   }
 
-  public void close() throws IOException, InterruptedException {
-    sorter.flush();
-    sorter.close();
+  @Override
+  public void handleEvents(List<Event> outputEvents) {
+    // Not expecting any events.
   }
 
   @Override
-  public OutputContext getOutputContext() {
-    return null;
+  public void setNumPhysicalOutputs(int numOutputs) {
+    this.numOutputs = numOutputs;
   }
 
+  @Override
+  public List<Event> close() throws IOException {
+    sorter.flush();
+    sorter.close();
+    this.endTime = System.nanoTime();
+
+    String host = System.getenv(ApplicationConstants.Environment.NM_HOST
+        .toString());
+    ByteBuffer shuffleMetadata = outputContext
+        .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+    int shufflePort = ShuffleUtils.deserializeShuffleMetaData(shuffleMetadata);
+
+    DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
+        .newBuilder();
+    payloadBuilder.setHost(host);
+    payloadBuilder.setPort(shufflePort);
+    payloadBuilder.setPathComponent(outputContext.getUniqueIdentifier());
+    payloadBuilder.setRunDuration((int) ((endTime - startTime) / 1000));
+    DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
+
+    List<Event> events = Lists.newArrayListWithCapacity(numOutputs);
+
+    for (int i = 0; i < numOutputs; i++) {
+      DataMovementEvent event = new DataMovementEvent(i,
+          payloadProto.toByteArray());
+      events.add(event);
+    }
+    return events;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
index 68e0f47..bd0e933 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
@@ -22,33 +22,57 @@ import java.io.IOException;
 
 /**
  * A key/value(s) pair based {@link Reader}.
+ * 
+ * Example usage
+ * <code>
+ * while (kvReader.moveToNext()) {
+ *   KVRecord kvRecord = getCurrentKV();
+ *   Object key =  kvRecord.getKey();
+ *   Iterable values = kvRecord.getValues();
+ * </code>
+ *
  */
 public interface KVReader extends Reader {
 
   /**
-   * Check if there is another key/value(s) pair
+   * Moves to the next key/values(s) pair
    * 
-   * @return true if another key/value(s) pair exists
+   * @return true if another key/value(s) pair exists, false if there are no more.
    * @throws IOException
    *           if an error occurs
    */
-  public boolean hasNext() throws IOException;
+  public boolean moveToNext() throws IOException;
 
   /**
-   * Gets the next key.
-   * 
-   * @return the next key, or null if none exists
+   * Return the current key/value(s) pair. Use moveToNext() to advance.
+   * @return
    * @throws IOException
-   *           if an error occurs
    */
-  public Object getNextKey() throws IOException;
+  public KVRecord getCurrentKV() throws IOException;
+  
+  
 
+  
   /**
-   * Get the next values.
-   * 
-   * @return an <code>Iterable</code> view of the values for the current key
-   * @throws IOException
-   *           if an error occurs
+   * Represents a key and an associated set of values
+   *
    */
-  public Iterable<Object> getNextValues() throws IOException;
+  public static class KVRecord {
+
+    private Object key;
+    private Iterable<Object> values;
+
+    public KVRecord(Object key, Iterable<Object> values) {
+      this.key = key;
+      this.values = values;
+    }
+
+    public Object getKey() {
+      return this.key;
+    }
+
+    public Iterable<Object> getValues() {
+      return this.values;
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java
index f945b63..ad48912 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 /**
  * A key/value(s) pair based {@link Writer}
  */
-public interface KVWriter {
+public interface KVWriter extends Writer {
   /**
    * Writes a key/value pair.
    * 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
index b4558d0..1d76d86 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
@@ -49,6 +49,9 @@ public class TezInputContextImpl extends TezTaskContextImpl
     this.sourceInfo = new EventMetaData(
         EventGenerator.INPUT, taskVertexName, sourceVertexName,
         taskAttemptID);
+    this.uniqueIdentifier = String.format("%s_%s_%6d_%2d_%s", taskAttemptID
+        .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
+        getTaskIndex(), getAttemptNumber(), sourceVertexName); 
   }
 
   @Override
@@ -70,5 +73,4 @@ public class TezInputContextImpl extends TezTaskContextImpl
   public String getSourceVertexName() {
     return sourceVertexName;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
index ba632db..e5b81d0 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
@@ -49,6 +49,9 @@ public class TezOutputContextImpl extends TezTaskContextImpl
     this.tezUmbilical = tezUmbilical;
     this.sourceInfo = new EventMetaData(EventGenerator.OUTPUT, taskVertexName,
         destinationVertexName, taskAttemptID);
+    this.uniqueIdentifier = String.format("%s_%s_%6d_%2d_%s", taskAttemptID
+        .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
+        getTaskIndex(), getAttemptNumber(), destinationVertexName);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
index 4e0f061..73c4a54 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
@@ -44,6 +44,9 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
     this.tezUmbilical = tezUmbilical;
     this.sourceInfo = new EventMetaData(EventGenerator.PROCESSOR,
         taskVertexName, "", taskAttemptID);
+    this.uniqueIdentifier = String.format("%s_%s_%6d_%2d", taskAttemptID
+        .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
+        getTaskIndex(), getAttemptNumber());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
index 712eec3..b77bcdd 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
@@ -18,18 +18,27 @@
 
 package org.apache.tez.engine.newapi.impl;
 
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
+import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.newapi.TezTaskContext;
+import org.apache.tez.engine.shuffle.common.ShuffleUtils;
 
 public abstract class TezTaskContextImpl implements TezTaskContext {
 
-  protected final Configuration conf;
+  private final Configuration conf;
   protected final String taskVertexName;
-  protected final TezTaskAttemptID taskAttemptID;
-  protected final TezCounters counters;
+  private final TezTaskAttemptID taskAttemptID;
+  private final TezCounters counters;
+  private String[] workDirs;
+  protected String uniqueIdentifier;
 
   @Private
   public TezTaskContextImpl(Configuration conf,
@@ -39,9 +48,18 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
     this.taskVertexName = taskVertexName;
     this.taskAttemptID = taskAttemptID;
     this.counters = counters;
+    // TODO Maybe change this to be task id specific at some point. For now
+    // Shuffle code relies on this being a path specified by YARN
+    this.workDirs = this.conf.getStrings(TezJobConfig.LOCAL_DIRS); 
   }
 
   @Override
+  public ApplicationId getApplicationId() {
+    return taskAttemptID.getTaskID().getVertexID().getDAGId()
+        .getApplicationId();
+  }
+  
+  @Override
   public int getTaskIndex() {
     return taskAttemptID.getTaskID().getId();
   }
@@ -52,8 +70,14 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
   }
 
   @Override
+  public String getDAGName() {
+    // TODO NEWTEZ Change to some form of the DAG name, for now using dagId as
+    // the unique identifier.
+    return taskAttemptID.getTaskID().getVertexID().getDAGId().toString();
+  }
+  
+  @Override
   public String getTaskVertexName() {
-    // TODO Auto-generated method stub
     return taskVertexName;
   }
 
@@ -63,5 +87,30 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
     return counters;
   }
 
-  // TODO Add a method to get working dir
+  @Override
+  public String[] getWorkDirs() {
+    return Arrays.copyOf(workDirs, workDirs.length);
+  }
+  
+  @Override
+  public String getUniqueIdentifier() {
+    return uniqueIdentifier;
+  }
+  
+  @Override
+  public void fatalError(Throwable exception, String message) {
+    // TODO NEWTEZ Implement once the TezContext communication is setup.
+  }
+  
+  @Override
+  public ByteBuffer getServiceConsumerMetaData(String serviceName) {
+    // TODO NEWTEZ Make sure this data is set by the AM for the Shuffle service name.
+    return null;
+  }
+  
+  @Override
+  public ByteBuffer getServiceProviderMetaData(String serviceName) {
+    return AuxiliaryServiceHelper.getServiceDataFromEnv(
+        ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, System.getenv());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java
b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java
new file mode 100644
index 0000000..3a6b2e4
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.shuffle.common;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.security.token.Token;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.common.security.JobTokenSecretManager;
+
+public class ShuffleUtils {
+
+  public static String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce.shuffle";
+
+  public static SecretKey getJobTokenSecretFromTokenBytes(ByteBuffer meta)
+      throws IOException {
+    DataInputByteBuffer in = new DataInputByteBuffer();
+    in.reset(meta);
+    Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
+    jt.readFields(in);
+    SecretKey sk = JobTokenSecretManager.createSecretKey(jt.getPassword());
+    return sk;
+  }
+
+  public static int deserializeShuffleMetaData(ByteBuffer meta)
+      throws IOException {
+    DataInputByteBuffer in = new DataInputByteBuffer();
+    try {
+      in.reset(meta);
+      int port = in.readInt();
+      return port;
+    } finally {
+      in.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index b2f1318..9bc430b 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -80,8 +80,8 @@ import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.engine.common.objectregistry.ObjectLifeCycle;
 import org.apache.tez.engine.common.objectregistry.ObjectRegistry;
 import org.apache.tez.engine.common.objectregistry.ObjectRegistryFactory;
-import org.apache.tez.engine.lib.input.ShuffledMergedInput;
-import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.lib.oldinput.OldShuffledMergedInput;
+import org.apache.tez.engine.lib.oldoutput.OldOnFileSortedOutput;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
@@ -611,9 +611,9 @@ public class MRRSleepJob extends Configured implements Tool {
                 DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
                 SchedulingType.SEQUENTIAL, 
                 new OutputDescriptor(
-                    OnFileSortedOutput.class.getName()),
+                    OldOnFileSortedOutput.class.getName()),
                 new InputDescriptor(
-                    ShuffledMergedInput.class.getName()))));
+                    OldShuffledMergedInput.class.getName()))));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 7fae4a3..016fbda 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -70,8 +70,8 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.engine.lib.input.ShuffledMergedInput;
-import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.lib.oldinput.OldShuffledMergedInput;
+import org.apache.tez.engine.lib.oldoutput.OldOnFileSortedOutput;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@@ -312,9 +312,9 @@ public class OrderedWordCount {
                 DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
                 SchedulingType.SEQUENTIAL, 
                 new OutputDescriptor(
-                    OnFileSortedOutput.class.getName()),
+                    OldOnFileSortedOutput.class.getName()),
                 new InputDescriptor(
-                    ShuffledMergedInput.class.getName()))));
+                    OldShuffledMergedInput.class.getName()))));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 701ca87..12953e4 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -68,8 +68,8 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.DAGStatus.State;
-import org.apache.tez.engine.lib.input.ShuffledMergedInput;
-import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.lib.oldinput.OldShuffledMergedInput;
+import org.apache.tez.engine.lib.oldoutput.OldOnFileSortedOutput;
 import org.apache.tez.mapreduce.examples.MRRSleepJob;
 import org.apache.tez.mapreduce.examples.MRRSleepJob.ISleepReducer;
 import org.apache.tez.mapreduce.examples.MRRSleepJob.MRRSleepJobPartitioner;
@@ -396,23 +396,23 @@ public class TestMRRJobsDAGApi {
     Edge edge1 = new Edge(stage1Vertex, stage2Vertex, new EdgeProperty(
         DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
         SchedulingType.SEQUENTIAL, new OutputDescriptor(
-        OnFileSortedOutput.class.getName()), new InputDescriptor(
-                ShuffledMergedInput.class.getName())));
+        OldOnFileSortedOutput.class.getName()), new InputDescriptor(
+                OldShuffledMergedInput.class.getName())));
     Edge edge11 = new Edge(stage11Vertex, stage22Vertex, new EdgeProperty(
         DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, 
         SchedulingType.SEQUENTIAL, new OutputDescriptor(
-        OnFileSortedOutput.class.getName()), new InputDescriptor(
-                ShuffledMergedInput.class.getName())));
+        OldOnFileSortedOutput.class.getName()), new InputDescriptor(
+                OldShuffledMergedInput.class.getName())));
     Edge edge2 = new Edge(stage2Vertex, stage3Vertex, new EdgeProperty(
         DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, 
         SchedulingType.SEQUENTIAL, new OutputDescriptor(
-        OnFileSortedOutput.class.getName()), new InputDescriptor(
-                ShuffledMergedInput.class.getName())));
+        OldOnFileSortedOutput.class.getName()), new InputDescriptor(
+                OldShuffledMergedInput.class.getName())));
     Edge edge3 = new Edge(stage22Vertex, stage3Vertex, new EdgeProperty(
         DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, 
         SchedulingType.SEQUENTIAL, new OutputDescriptor(
-        OnFileSortedOutput.class.getName()), new InputDescriptor(
-                ShuffledMergedInput.class.getName())));
+        OldOnFileSortedOutput.class.getName()), new InputDescriptor(
+                OldShuffledMergedInput.class.getName())));
 
     dag.addEdge(edge1);
     dag.addEdge(edge11);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
index 204f517..7df783b 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
@@ -76,8 +76,8 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.api.Task;
 import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
 import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.lib.input.LocalMergedInput;
-import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
+import org.apache.tez.engine.lib.oldinput.LocalMergedInput;
+import org.apache.tez.engine.lib.oldoutput.OldLocalOnFileSorterOutput;
 import org.apache.tez.engine.newapi.impl.TezEvent;
 import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
 import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
@@ -257,7 +257,7 @@ public class LocalJobRunnerTez implements ClientProtocol {
                   Collections.singletonList(new InputSpec("srcVertex", 0,
                       SimpleInput.class.getName())),
                   Collections.singletonList(new OutputSpec("tgtVertex", 0,
-                      LocalOnFileSorterOutput.class.getName())));
+                      OldLocalOnFileSorterOutput.class.getName())));
 
           TezTaskOutput mapOutput = new TezLocalTaskOutputFiles();
           mapOutput.setConf(localConf);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 5793104..4fb1876 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -48,7 +48,7 @@ import org.apache.tez.engine.api.Processor;
 import org.apache.tez.engine.common.ConfigUtils;
 import org.apache.tez.engine.common.sort.SortingOutput;
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.engine.lib.oldinput.OldShuffledMergedInput;
 import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
 import org.apache.tez.mapreduce.input.SimpleInput;
 import org.apache.tez.mapreduce.output.SimpleOutput;
@@ -108,12 +108,12 @@ implements Processor {
 
     if (in instanceof SimpleInput) {
       ((SimpleInput)in).setTask(this);
-    } else if (in instanceof ShuffledMergedInput) {
-      ((ShuffledMergedInput)in).setTask(this);
+    } else if (in instanceof OldShuffledMergedInput) {
+      ((OldShuffledMergedInput)in).setTask(this);
     }
     
     if(ins.length > 1) {
-      if (!(in instanceof ShuffledMergedInput)) {
+      if (!(in instanceof OldShuffledMergedInput)) {
         throw new IOException(
             "Only ShuffledMergedInput can support multiple inputs"
                 + ". inputCount=" + ins.length);
@@ -124,15 +124,15 @@ implements Processor {
                 + ins.length + " From contex:" + inputs.size());
       }
       // initialize and merge the remaining
-      ShuffledMergedInput s0 = ((ShuffledMergedInput)in);
+      OldShuffledMergedInput s0 = ((OldShuffledMergedInput)in);
       for(int i=1; i<ins.length; ++i) {
         Input inputi = ins[i];
-        if (!(inputi instanceof ShuffledMergedInput)) {
+        if (!(inputi instanceof OldShuffledMergedInput)) {
           throw new IOException(
               "Only ShuffledMergedInput can support multiple inputs"
                   + ". inputCount=" + ins.length);
         }      
-        ShuffledMergedInput si = ((ShuffledMergedInput)inputi);
+        OldShuffledMergedInput si = ((OldShuffledMergedInput)inputi);
         s0.mergeWith(si);
       }
     }
@@ -162,10 +162,10 @@ implements Processor {
         reporter.getCounter(TaskCounter.REDUCE_INPUT_RECORDS);
         
     // Sanity check
-    if (!(in instanceof ShuffledMergedInput)) {
+    if (!(in instanceof OldShuffledMergedInput)) {
       throw new IOException("Illegal input to reduce: " + in.getClass());
     }
-    ShuffledMergedInput shuffleInput = (ShuffledMergedInput)in;
+    OldShuffledMergedInput shuffleInput = (OldShuffledMergedInput)in;
 
     if (useNewApi) {
       try {
@@ -194,7 +194,7 @@ implements Processor {
   void runOldReducer(JobConf job,
       TezTaskUmbilicalProtocol umbilical,
       final MRTaskReporter reporter,
-      ShuffledMergedInput input,
+      OldShuffledMergedInput input,
       RawComparator comparator,
       Class keyClass,
       Class valueClass,
@@ -265,7 +265,7 @@ implements Processor {
     private Counter reduceInputValueCounter;
     private Progress reducePhase;
 
-    public ReduceValuesIterator (ShuffledMergedInput in,
+    public ReduceValuesIterator (OldShuffledMergedInput in,
         RawComparator<KEY> comparator, 
         Class<KEY> keyClass,
         Class<VALUE> valClass,
@@ -297,7 +297,7 @@ implements Processor {
   void runNewReducer(JobConf job,
       final TezTaskUmbilicalProtocol umbilical,
       final MRTaskReporter reporter,
-      ShuffledMergedInput input,
+      OldShuffledMergedInput input,
       RawComparator comparator,
       Class keyClass,
       Class valueClass,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index ae62251..3610f9f 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -38,8 +38,7 @@ import org.apache.tez.engine.api.Task;
 import org.apache.tez.engine.common.sort.impl.IFile;
 import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
 import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.lib.output.InMemorySortedOutput;
-import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
+import org.apache.tez.engine.lib.oldoutput.OldLocalOnFileSorterOutput;
 import org.apache.tez.mapreduce.TestUmbilicalProtocol;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
@@ -47,14 +46,9 @@ import org.apache.tez.mapreduce.hadoop.mapreduce.TezNullOutputCommitter;
 import org.apache.tez.mapreduce.input.SimpleInput;
 import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MapUtils;
-import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.TruncatedChannelBuffer;
-import org.jboss.netty.handler.stream.ChunkedStream;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 @SuppressWarnings("deprecation")
@@ -122,7 +116,7 @@ public class TestMapProcessor {
         Collections.singletonList(new InputSpec("NullVertex", 0,
             SimpleInput.class.getName())),
         Collections.singletonList(new OutputSpec("FakeVertex", 1,
-            LocalOnFileSorterOutput.class.getName())));
+            OldLocalOnFileSorterOutput.class.getName())));
 
     MRTask mrTask = (MRTask)t.getProcessor();
     Assert.assertEquals(TezNullOutputCommitter.class.getName(), mrTask
@@ -151,76 +145,76 @@ public class TestMapProcessor {
     reader.close();
   }
 
-  @Test
-  @Ignore
-  public void testMapProcessorWithInMemSort() throws Exception {
-    
-    String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
-    
-    final int partitions = 2;
-    JobConf jobConf = new JobConf(defaultConf);
-    jobConf.setNumReduceTasks(partitions);
-    setUpJobConf(jobConf);
-    TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
-    mapOutputs.setConf(jobConf);
-    
-    Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
-    Configuration stageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
-        vertexName);
-    
-    JobConf job = new JobConf(stageConf);
-
-    job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
-        "localized-resources").toUri().toString());
-    localFs.delete(workDir, true);
-    Task t =
-        MapUtils.runMapProcessor(
-            localFs, workDir, job, 0, new Path(workDir, "map0"), 
-            new TestUmbilicalProtocol(true), vertexName, 
-            Collections.singletonList(new InputSpec("NullVertex", 0,
-                SimpleInput.class.getName())),
-            Collections.singletonList(new OutputSpec("FakeVertex", 1,
-                InMemorySortedOutput.class.getName()))
-            );
-    InMemorySortedOutput[] outputs = (InMemorySortedOutput[])t.getOutputs();
-    
-    verifyInMemSortedStream(outputs[0], 0, 4096);
-    int i = 0;
-    for (i = 2; i < 256; i <<= 1) {
-      verifyInMemSortedStream(outputs[0], 0, i);
-    }
-    verifyInMemSortedStream(outputs[0], 1, 4096);
-    for (i = 2; i < 256; i <<= 1) {
-      verifyInMemSortedStream(outputs[0], 1, i);
-    }
-
-    t.close();
-  }
-  
-  private void verifyInMemSortedStream(
-      InMemorySortedOutput output, int partition, int chunkSize) 
-          throws Exception {
-    ChunkedStream cs = 
-        new ChunkedStream(
-            output.getSorter().getSortedStream(partition), chunkSize);
-    int actualBytes = 0;
-    ChannelBuffer b = null;
-    while ((b = (ChannelBuffer)cs.nextChunk()) != null) {
-      LOG.info("b = " + b);
-      actualBytes += 
-          (b instanceof TruncatedChannelBuffer) ? 
-              ((TruncatedChannelBuffer)b).capacity() :
-              ((BigEndianHeapChannelBuffer)b).readableBytes();
-    }
-    
-    LOG.info("verifyInMemSortedStream" +
-    		" partition=" + partition + 
-    		" chunkSize=" + chunkSize +
-        " expected=" + 
-    		output.getSorter().getShuffleHeader(partition).getCompressedLength() + 
-        " actual=" + actualBytes);
-    Assert.assertEquals(
-        output.getSorter().getShuffleHeader(partition).getCompressedLength(), 
-        actualBytes);
-  }
+//  @Test
+//  @Ignore
+//  public void testMapProcessorWithInMemSort() throws Exception {
+//    
+//    String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
+//    
+//    final int partitions = 2;
+//    JobConf jobConf = new JobConf(defaultConf);
+//    jobConf.setNumReduceTasks(partitions);
+//    setUpJobConf(jobConf);
+//    TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
+//    mapOutputs.setConf(jobConf);
+//    
+//    Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
+//    Configuration stageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
+//        vertexName);
+//    
+//    JobConf job = new JobConf(stageConf);
+//
+//    job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
+//        "localized-resources").toUri().toString());
+//    localFs.delete(workDir, true);
+//    Task t =
+//        MapUtils.runMapProcessor(
+//            localFs, workDir, job, 0, new Path(workDir, "map0"), 
+//            new TestUmbilicalProtocol(true), vertexName, 
+//            Collections.singletonList(new InputSpec("NullVertex", 0,
+//                SimpleInput.class.getName())),
+//            Collections.singletonList(new OutputSpec("FakeVertex", 1,
+//                OldInMemorySortedOutput.class.getName()))
+//            );
+//    OldInMemorySortedOutput[] outputs = (OldInMemorySortedOutput[])t.getOutputs();
+//    
+//    verifyInMemSortedStream(outputs[0], 0, 4096);
+//    int i = 0;
+//    for (i = 2; i < 256; i <<= 1) {
+//      verifyInMemSortedStream(outputs[0], 0, i);
+//    }
+//    verifyInMemSortedStream(outputs[0], 1, 4096);
+//    for (i = 2; i < 256; i <<= 1) {
+//      verifyInMemSortedStream(outputs[0], 1, i);
+//    }
+//
+//    t.close();
+//  }
+//  
+//  private void verifyInMemSortedStream(
+//      OldInMemorySortedOutput output, int partition, int chunkSize) 
+//          throws Exception {
+//    ChunkedStream cs = 
+//        new ChunkedStream(
+//            output.getSorter().getSortedStream(partition), chunkSize);
+//    int actualBytes = 0;
+//    ChannelBuffer b = null;
+//    while ((b = (ChannelBuffer)cs.nextChunk()) != null) {
+//      LOG.info("b = " + b);
+//      actualBytes += 
+//          (b instanceof TruncatedChannelBuffer) ? 
+//              ((TruncatedChannelBuffer)b).capacity() :
+//              ((BigEndianHeapChannelBuffer)b).readableBytes();
+//    }
+//    
+//    LOG.info("verifyInMemSortedStream" +
+//    		" partition=" + partition + 
+//    		" chunkSize=" + chunkSize +
+//        " expected=" + 
+//    		output.getSorter().getShuffleHeader(partition).getCompressedLength() + 
+//        " actual=" + actualBytes);
+//    Assert.assertEquals(
+//        output.getSorter().getShuffleHeader(partition).getCompressedLength(), 
+//        actualBytes);
+//  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 2428000..2a121a6 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -40,8 +40,8 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.engine.api.Task;
 import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
 import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.lib.input.LocalMergedInput;
-import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
+import org.apache.tez.engine.lib.oldinput.LocalMergedInput;
+import org.apache.tez.engine.lib.oldoutput.OldLocalOnFileSorterOutput;
 import org.apache.tez.engine.runtime.RuntimeUtils;
 import org.apache.tez.mapreduce.TestUmbilicalProtocol;
 import org.apache.tez.mapreduce.TezTestUtils;
@@ -122,7 +122,7 @@ public class TestReduceProcessor {
         Collections.singletonList(new InputSpec("NullVertex", 0,
             SimpleInput.class.getName())),
         Collections.singletonList(new OutputSpec("FakeVertex", 1,
-            LocalOnFileSorterOutput.class.getName())));
+            OldLocalOnFileSorterOutput.class.getName())));
 
     LOG.info("Starting reduce...");
     

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index 0fb823f..2d59b18 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -95,8 +95,8 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.engine.lib.input.ShuffledMergedInput;
-import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.lib.oldinput.OldShuffledMergedInput;
+import org.apache.tez.engine.lib.oldoutput.OldOnFileSortedOutput;
 import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@@ -463,8 +463,8 @@ public class YARNRunner implements ClientProtocol {
         EdgeProperty edgeProperty = new EdgeProperty(
             DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
             SchedulingType.SEQUENTIAL, 
-            new OutputDescriptor(OnFileSortedOutput.class.getName()),
-            new InputDescriptor(ShuffledMergedInput.class.getName()));
+            new OutputDescriptor(OldOnFileSortedOutput.class.getName()),
+            new InputDescriptor(OldShuffledMergedInput.class.getName()));
 
         Edge edge = null;
         edge = new Edge(vertices[i - 1], vertices[i], edgeProperty);


Mime
View raw message