tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [8/9] TEZ-1479. Disambiguate (refactor) between ShuffleInputEventHandlers and Fetchers. (sseth) (cherry picked from commit 7be5830a908602ff91a07d3020f2dddf7705d48f)
Date Wed, 15 Oct 2014 18:59:44 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java
new file mode 100644
index 0000000..e25a325
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+
+import com.google.common.base.Preconditions;
+
+public class MemoryFetchedInput extends FetchedInput {
+
+  private BoundedByteArrayOutputStream byteStream;
+
+  public MemoryFetchedInput(long actualSize, long compressedSize,
+      InputAttemptIdentifier inputAttemptIdentifier,
+      FetchedInputCallback callbackHandler) {
+    super(Type.MEMORY, actualSize, compressedSize, inputAttemptIdentifier, callbackHandler);
+    this.byteStream = new BoundedByteArrayOutputStream((int) actualSize);
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    return byteStream;
+  }
+
+  @Override
+  public InputStream getInputStream() {
+    return new ByteArrayInputStream(byteStream.getBuffer());
+  }
+
+  public byte[] getBytes() {
+    return byteStream.getBuffer();
+  }
+  
+  @Override
+  public void commit() {
+    if (state == State.PENDING) {
+      state = State.COMMITTED;
+      notifyFetchComplete();
+    }
+  }
+
+  @Override
+  public void abort() {
+    if (state == State.PENDING) {
+      state = State.ABORTED;
+      notifyFetchFailure();
+    }
+  }
+  
+  @Override
+  public void free() {
+    Preconditions.checkState(
+        state == State.COMMITTED || state == State.ABORTED,
+        "FetchedInput can only be freed after it is committed or aborted");
+    if (state == State.COMMITTED) { // ABORTED would have already called cleanup
+      state = State.FREED;
+      this.byteStream = null;
+      notifyFreedResource();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "MemoryFetchedInput [inputAttemptIdentifier="
+        + inputAttemptIdentifier + ", actualSize=" + actualSize
+        + ", compressedSize=" + compressedSize + ", type=" + type + ", id="
+        + id + ", state=" + state + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleEventHandler.java
new file mode 100644
index 0000000..ff66158
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleEventHandler.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.tez.runtime.api.Event;
+
+public interface ShuffleEventHandler {
+  public void handleEvents(List<Event> events) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
new file mode 100644
index 0000000..7af0b71
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import javax.crypto.SecretKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.security.token.Token;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParams;
+import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParamsBuilder;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+
+public class ShuffleUtils {
+
+  private static final Log LOG = LogFactory.getLog(ShuffleUtils.class);
+  public static String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce_shuffle";
+
+  public static SecretKey getJobTokenSecretFromTokenBytes(ByteBuffer meta)
+      throws IOException {
+    DataInputByteBuffer in = new DataInputByteBuffer();
+    in.reset(meta);
+    Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
+    jt.readFields(in);
+    SecretKey sk = JobTokenSecretManager.createSecretKey(jt.getPassword());
+    return sk;
+  }
+
+  public static ByteBuffer convertJobTokenToBytes(
+      Token<JobTokenIdentifier> jobToken) throws IOException {
+    DataOutputBuffer dob = new DataOutputBuffer();
+    jobToken.write(dob);
+    ByteBuffer bb = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    return bb;
+  }
+
+  public static int deserializeShuffleProviderMetaData(ByteBuffer meta)
+      throws IOException {
+    DataInputByteBuffer in = new DataInputByteBuffer();
+    try {
+      in.reset(meta);
+      int port = in.readInt();
+      return port;
+    } finally {
+      in.close();
+    }
+  }
+
+  @SuppressWarnings("resource")
+  public static void shuffleToMemory(byte[] shuffleData,
+      InputStream input, int decompressedLength, int compressedLength,
+      CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength,
+      Log LOG, String identifier) throws IOException {
+    try {
+      IFile.Reader.readToMemory(shuffleData, input, compressedLength, codec,
+        ifileReadAhead, ifileReadAheadLength);
+      // metrics.inputBytes(shuffleData.length);
+      LOG.info("Read " + shuffleData.length + " bytes from input for "
+          + identifier);
+    } catch (IOException ioe) {
+      // Close the streams
+      IOUtils.cleanup(LOG, input);
+      // Re-throw
+      throw ioe;
+    }
+  }
+  
+  public static void shuffleToDisk(OutputStream output, String hostIdentifier,
+      InputStream input, long compressedLength, Log LOG, String identifier)
+      throws IOException {
+    // Copy data to local-disk
+    long bytesLeft = compressedLength;
+    try {
+      final int BYTES_TO_READ = 64 * 1024;
+      byte[] buf = new byte[BYTES_TO_READ];
+      while (bytesLeft > 0) {
+        int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
+        if (n < 0) {
+          throw new IOException("read past end of stream reading "
+              + identifier);
+        }
+        output.write(buf, 0, n);
+        bytesLeft -= n;
+        // metrics.inputBytes(n);
+      }
+
+      LOG.info("Read " + (compressedLength - bytesLeft)
+          + " bytes from input for " + identifier);
+
+      output.close();
+    } catch (IOException ioe) {
+      // Close the streams
+      IOUtils.cleanup(LOG, input, output);
+      // Re-throw
+      throw ioe;
+    }
+
+    // Sanity check
+    if (bytesLeft != 0) {
+      throw new IOException("Incomplete map output received for " +
+          identifier + " from " +
+          hostIdentifier + " (" + 
+          bytesLeft + " bytes missing of " + 
+          compressedLength + ")");
+    }
+  }
+
+  // TODO NEWTEZ handle ssl shuffle
+  public static StringBuilder constructBaseURIForShuffleHandler(String host,
+      int port, int partition, String appId, boolean sslShuffle) {
+    return constructBaseURIForShuffleHandler(host + ":" + String.valueOf(port),
+      partition, appId, sslShuffle);
+  }
+  
+  public static StringBuilder constructBaseURIForShuffleHandler(String hostIdentifier,
+      int partition, String appId, boolean sslShuffle) {
+    final String http_protocol = (sslShuffle) ? "https://" : "http://";
+    StringBuilder sb = new StringBuilder(http_protocol);
+    sb.append(hostIdentifier);
+    sb.append("/");
+    sb.append("mapOutput?job=");
+    sb.append(appId.replace("application", "job"));
+    sb.append("&reduce=");
+    sb.append(String.valueOf(partition));
+    sb.append("&map=");
+    return sb;
+  }
+
+  public static URL constructInputURL(String baseURI, 
+      List<InputAttemptIdentifier> inputs, boolean keepAlive) throws MalformedURLException {
+    StringBuilder url = new StringBuilder(baseURI);
+    boolean first = true;
+    for (InputAttemptIdentifier input : inputs) {
+      if (first) {
+        first = false;
+        url.append(input.getPathComponent());
+      } else {
+        url.append(",").append(input.getPathComponent());
+      }
+    }
+    //It is possible to override keep-alive setting in cluster by adding keepAlive in url.
+    //Refer MAPREDUCE-5787 to enable/disable keep-alive in the cluster.
+    if (keepAlive) {
+      url.append("&keepAlive=true");
+    }
+    return new URL(url.toString());
+  }
+
+  public static HttpConnectionParams constructHttpShuffleConnectionParams(
+      Configuration conf) {
+    HttpConnectionParamsBuilder builder = new HttpConnectionParamsBuilder();
+
+    int connectionTimeout =
+        conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,
+          TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT_DEFAULT);
+
+    int readTimeout =
+        conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT,
+          TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT_DEFAULT);
+
+    int bufferSize =
+        conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE,
+          TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE_DEFAULT);
+
+    boolean keepAlive =
+        conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED,
+          TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED_DEFAULT);
+    int keepAliveMaxConnections =
+        conf.getInt(
+          TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS,
+          TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS_DEFAULT);
+    if (keepAlive) {
+      System.setProperty("sun.net.http.errorstream.enableBuffering", "true");
+      System.setProperty("http.maxConnections",
+        String.valueOf(keepAliveMaxConnections));
+      LOG.info("Set keepAlive max connections: " + keepAliveMaxConnections);
+    }
+
+    builder.setTimeout(connectionTimeout, readTimeout)
+        .setBufferSize(bufferSize)
+        .setKeepAlive(keepAlive, keepAliveMaxConnections);
+
+    boolean sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
+      TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
+    builder.setSSL(sslShuffle, conf);
+
+    return builder.build();
+  }
+
+  public static String stringify(DataMovementEventPayloadProto dmProto) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    if (dmProto.hasEmptyPartitions()) {
+      sb.append("hasEmptyPartitions: ").append(dmProto.hasEmptyPartitions()).append(", ");
+    }
+    sb.append("host: " + dmProto.getHost()).append(", ");
+    sb.append("port: " + dmProto.getPort()).append(", ");
+    sb.append("pathComponent: " + dmProto.getPathComponent()).append(", ");
+    sb.append("runDuration: " + dmProto.getRunDuration()).append(", ");
+    if (dmProto.hasData()) {
+      sb.append(", ").append("hasDataInEvent: " + dmProto.hasData());
+    }
+    sb.append("]");
+    return sb.toString();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ExceptionReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ExceptionReporter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ExceptionReporter.java
deleted file mode 100644
index a872ba1..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ExceptionReporter.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.common.shuffle.impl;
-
-/**
- * An interface for reporting exceptions to other threads
- */
-interface ExceptionReporter {
-  void reportException(Throwable t);
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
deleted file mode 100644
index ea1bac3..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ /dev/null
@@ -1,663 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.common.shuffle.impl;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.SocketTimeoutException;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-import javax.crypto.SecretKey;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.tez.common.TezUtilsInternal;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.tez.common.TezRuntimeFrameworkConfigs;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.runtime.api.InputContext;
-import org.apache.tez.runtime.library.common.Constants;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.shuffle.impl.MapOutput.Type;
-import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
-import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
-import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
-import org.apache.tez.runtime.library.shuffle.common.HttpConnection;
-import org.apache.tez.runtime.library.shuffle.common.HttpConnection.HttpConnectionParams;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
-
-import com.google.common.annotations.VisibleForTesting;
-
-class Fetcher extends Thread {
-  
-  private static final Log LOG = LogFactory.getLog(Fetcher.class);
-  private final Configuration conf;
-  private final boolean localDiskFetchEnabled;
-
-  private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
-                                    CONNECTION, WRONG_REDUCE}
-  
-  private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
-  private final TezCounter connectionErrs;
-  private final TezCounter ioErrs;
-  private final TezCounter wrongLengthErrs;
-  private final TezCounter badIdErrs;
-  private final TezCounter wrongMapErrs;
-  private final TezCounter wrongReduceErrs;
-  private final MergeManager merger;
-  private final ShuffleScheduler scheduler;
-  private final ShuffleClientMetrics metrics;
-  private final Shuffle shuffle;
-  private final int id;
-  private final String logIdentifier;
-  private static int nextId = 0;
-  private int currentPartition = -1;
-
-  // Decompression of map-outputs
-  private final CompressionCodec codec;
-  private final SecretKey jobTokenSecret;
-
-  @VisibleForTesting
-  volatile boolean stopped = false;
-  
-  private final boolean ifileReadAhead;
-  private final int ifileReadAheadLength;
-  private LinkedHashSet<InputAttemptIdentifier> remaining;
-
-  volatile HttpURLConnection connection;
-  volatile DataInputStream input;
-
-  HttpConnection httpConnection;
-  HttpConnectionParams httpConnectionParams;
-
-  final static String localhostName = NetUtils.getHostname();
-
-  // Initiative value is 0, which means it hasn't retried yet.
-  private long retryStartTime = 0;
-  
-  public Fetcher(HttpConnectionParams httpConnectionParams,
-                 ShuffleScheduler scheduler, MergeManager merger,
-                 ShuffleClientMetrics metrics,
-                 Shuffle shuffle, SecretKey jobTokenSecret,
-                 boolean ifileReadAhead, int ifileReadAheadLength, CompressionCodec codec,
-                 InputContext inputContext, Configuration conf, boolean localDiskFetchEnabled) throws IOException {
-    setDaemon(true);
-    this.scheduler = scheduler;
-    this.merger = merger;
-    this.metrics = metrics;
-    this.shuffle = shuffle;
-    this.id = ++nextId;
-    this.jobTokenSecret = jobTokenSecret;
-    ioErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
-        ShuffleErrors.IO_ERROR.toString());
-    wrongLengthErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
-        ShuffleErrors.WRONG_LENGTH.toString());
-    badIdErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
-        ShuffleErrors.BAD_ID.toString());
-    wrongMapErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
-        ShuffleErrors.WRONG_MAP.toString());
-    connectionErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
-        ShuffleErrors.CONNECTION.toString());
-    wrongReduceErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
-        ShuffleErrors.WRONG_REDUCE.toString());
-
-    this.ifileReadAhead = ifileReadAhead;
-    this.ifileReadAheadLength = ifileReadAheadLength;
-    this.httpConnectionParams = httpConnectionParams;
-    if (codec != null) {
-      this.codec = codec;
-    } else {
-      this.codec = null;
-    }
-    this.conf = conf;
-
-    this.localDiskFetchEnabled = localDiskFetchEnabled;
-
-    this.logIdentifier = "fetcher [" + TezUtilsInternal
-        .cleanVertexName(inputContext.getSourceVertexName()) + "] #" + id + " " + localhostName;
-    setName(logIdentifier);
-    setDaemon(true);
-  }  
-
-  public void run() {
-    try {
-      while (!stopped && !Thread.currentThread().isInterrupted()) {
-        remaining = null; // Safety.
-        MapHost host = null;
-        try {
-          // If merge is on, block
-          merger.waitForInMemoryMerge();
-
-          // Get a host to shuffle from
-          host = scheduler.getHost();
-          metrics.threadBusy();
-
-          String hostPort = host.getHostIdentifier();
-          String hostname = hostPort.substring(0, hostPort.indexOf(":"));
-          if (localDiskFetchEnabled &&
-              hostname.equals(System.getenv(ApplicationConstants.Environment.NM_HOST.toString()))) {
-            setupLocalDiskFetch(host);
-          } else {
-            // Shuffle
-            copyFromHost(host);
-          }
-        } finally {
-          cleanupCurrentConnection(false);
-          if (host != null) {
-            scheduler.freeHost(host);
-            metrics.threadFree();
-          }
-        }
-      }
-    } catch (InterruptedException ie) {
-      return;
-    } catch (Throwable t) {
-      shuffle.reportException(t);
-    }
-  }
-
-  public void shutDown() throws InterruptedException {
-    this.stopped = true;
-    interrupt();
-    cleanupCurrentConnection(true);
-    try {
-      join(5000);
-    } catch (InterruptedException ie) {
-      LOG.warn("Got interrupt while joining " + getName(), ie);
-    }
-  }
-
-  private Object cleanupLock = new Object();
-  private void cleanupCurrentConnection(boolean disconnect) {
-    // Synchronizing on cleanupLock to ensure we don't run into a parallel close
-    // Can't synchronize on the main class itself since that would cause the
-    // shutdown request to block
-    synchronized (cleanupLock) {
-      try {
-        if (httpConnection != null) {
-          httpConnection.cleanup(disconnect);
-        }
-      } catch (IOException e) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Exception while shutting down fetcher " + logIdentifier, e);
-        } else {
-          LOG.info("Exception while shutting down fetcher " + logIdentifier + ": " + e.getMessage());
-        }
-      }
-    }
-  }
-
-  /**
-   * The crux of the matter...
-   * 
-   * @param host {@link MapHost} from which we need to  
-   *              shuffle available map-outputs.
-   */
-  @VisibleForTesting
-  protected void copyFromHost(MapHost host) throws IOException {
-    // reset retryStartTime for a new host
-    retryStartTime = 0;
-    // Get completed maps on 'host'
-    List<InputAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
-    currentPartition = host.getPartitionId();
-    
-    // Sanity check to catch hosts with only 'OBSOLETE' maps, 
-    // especially at the tail of large jobs
-    if (srcAttempts.size() == 0) {
-      return;
-    }
-    
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
-        + srcAttempts + ", partitionId: " + currentPartition);
-    }
-    
-    // List of maps to be fetched yet
-    remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
-    
-    // Construct the url and connect
-    if (!setupConnection(host, srcAttempts)) {
-      if (stopped) {
-        cleanupCurrentConnection(true);
-      }
-      // Add back all remaining maps - which at this point is ALL MAPS the
-      // Fetcher was started with. The Scheduler takes care of retries,
-      // reporting too many failures etc.
-      putBackRemainingMapOutputs(host);
-      return;
-    }
-
-    try {
-      // Loop through available map-outputs and fetch them
-      // On any error, faildTasks is not null and we exit
-      // after putting back the remaining maps to the 
-      // yet_to_be_fetched list and marking the failed tasks.
-      InputAttemptIdentifier[] failedTasks = null;
-      while (!remaining.isEmpty() && failedTasks == null) {
-        // fail immediately after first failure because we dont know how much to
-        // skip for this error in the input stream. So we cannot move on to the 
-        // remaining outputs. YARN-1773. Will get to them in the next retry.
-        try {
-          failedTasks = copyMapOutput(host, input);
-        } catch (FetcherReadTimeoutException e) {
-          // Setup connection again if disconnected
-          cleanupCurrentConnection(true);
-
-          // Connect with retry
-          if (!setupConnection(host, new LinkedList<InputAttemptIdentifier>(remaining))) {
-            if (stopped) {
-              cleanupCurrentConnection(true);
-            }
-            failedTasks = new InputAttemptIdentifier[] {getNextRemainingAttempt()};
-            break;
-          }
-        }
-      }
-      
-      if(failedTasks != null && failedTasks.length > 0) {
-        LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
-        for(InputAttemptIdentifier left: failedTasks) {
-          scheduler.copyFailed(left, host, true, false);
-        }
-      }
-
-      cleanupCurrentConnection(false);
-
-      // Sanity check
-      if (failedTasks == null && !remaining.isEmpty()) {
-        throw new IOException("server didn't return all expected map outputs: "
-            + remaining.size() + " left.");
-      }
-    } finally {
-      putBackRemainingMapOutputs(host);
-    }
-  }
-
-  @VisibleForTesting
-  boolean setupConnection(MapHost host, List<InputAttemptIdentifier> attempts)
-      throws IOException {
-    boolean connectSucceeded = false;
-    try {
-      URL url = ShuffleUtils.constructInputURL(host.getBaseUrl(), attempts,
-          httpConnectionParams.getKeepAlive());
-      httpConnection = new HttpConnection(url, httpConnectionParams,
-          logIdentifier, jobTokenSecret);
-      connectSucceeded = httpConnection.connect();
-
-      if (stopped) {
-        LOG.info("Detected fetcher has been shutdown after connection establishment. Returning");
-        return false;
-      }
-      input = httpConnection.getInputStream();
-      httpConnection.validate();
-      return true;
-    } catch (IOException ie) {
-      if (stopped) {
-        LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
-        return false;
-      }
-      ioErrs.increment(1);
-      if (!connectSucceeded) {
-        LOG.warn("Failed to connect to " + host + " with " + remaining.size() + " inputs", ie);
-        connectionErrs.increment(1);
-      } else {
-        LOG.warn("Failed to verify reply after connecting to " + host + " with " + remaining.size()
-            + " inputs pending", ie);
-      }
-
-      // At this point, either the connection failed, or the initial header verification failed.
-      // The error does not relate to any specific Input. Report all of them as failed.
-      // This ends up indirectly penalizing the host (multiple failures reported on the single host)
-      for(InputAttemptIdentifier left: remaining) {
-        // Need to be handling temporary glitches ..
-        // Report read error to the AM to trigger source failure heuristics
-        scheduler.copyFailed(left, host, connectSucceeded, !connectSucceeded);
-      }
-      return false;
-    }
-  }
-
-  @VisibleForTesting
-  protected void putBackRemainingMapOutputs(MapHost host) {
-    // Cycle through remaining MapOutputs
-    boolean isFirst = true;
-    InputAttemptIdentifier first = null;
-    for (InputAttemptIdentifier left : remaining) {
-      if (isFirst) {
-        first = left;
-        isFirst = false;
-        continue;
-      }
-      scheduler.putBackKnownMapOutput(host, left);
-    }
-    if (first != null) { // Empty remaining list.
-      scheduler.putBackKnownMapOutput(host, first);
-    }
-  }
-
-  private static InputAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY = new InputAttemptIdentifier[0];
-  
-  protected InputAttemptIdentifier[] copyMapOutput(MapHost host,
-                                DataInputStream input) throws FetcherReadTimeoutException {
-    MapOutput mapOutput = null;
-    InputAttemptIdentifier srcAttemptId = null;
-    long decompressedLength = -1;
-    long compressedLength = -1;
-    
-    try {
-      long startTime = System.currentTimeMillis();
-      int forReduce = -1;
-      //Read the shuffle header
-      try {
-        ShuffleHeader header = new ShuffleHeader();
-        // TODO Review: Multiple header reads in case of status WAIT ? 
-        header.readFields(input);
-        if (!header.mapId.startsWith(InputAttemptIdentifier.PATH_PREFIX)) {
-          throw new IllegalArgumentException(
-              "Invalid header received: " + header.mapId + " partition: " + header.forReduce);
-        }
-        srcAttemptId = 
-            scheduler.getIdentifierForFetchedOutput(header.mapId, header.forReduce);
-        compressedLength = header.compressedLength;
-        decompressedLength = header.uncompressedLength;
-        forReduce = header.forReduce;
-      } catch (IllegalArgumentException e) {
-        badIdErrs.increment(1);
-        LOG.warn("Invalid map id ", e);
-        // Don't know which one was bad, so consider this one bad and dont read
-        // the remaining because we dont know where to start reading from. YARN-1773
-        return new InputAttemptIdentifier[] {getNextRemainingAttempt()};
-      }
-
-      // Do some basic sanity verification
-      if (!verifySanity(compressedLength, decompressedLength, forReduce,
-          remaining, srcAttemptId)) {
-        if (srcAttemptId == null) {
-          LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null");
-          srcAttemptId = getNextRemainingAttempt();
-        }
-        assert(srcAttemptId != null);
-        return new InputAttemptIdentifier[] {srcAttemptId};
-      }
-      
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength + 
-            ", decomp len: " + decompressedLength);
-      }
-      
-      // Get the location for the map output - either in-memory or on-disk
-      try {
-        mapOutput = merger.reserve(srcAttemptId, decompressedLength, compressedLength, id);
-      } catch (IOException e) {
-        // Kill the reduce attempt
-        ioErrs.increment(1);
-        scheduler.reportLocalError(e);
-        return EMPTY_ATTEMPT_ID_ARRAY;
-      }
-      
-      // Check if we can shuffle *now* ...
-      if (mapOutput.getType() == Type.WAIT) {
-        // TODO Review: Does this cause a tight loop ?
-        LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
-        //Not an error but wait to process data.
-        return EMPTY_ATTEMPT_ID_ARRAY;
-      } 
-      
-      // Go!
-      LOG.info("fetcher#" + id + " about to shuffle output of map " + 
-               mapOutput.getAttemptIdentifier() + " decomp: " +
-               decompressedLength + " len: " + compressedLength);
-      if (mapOutput.getType() == Type.MEMORY) {
-        ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input,
-          (int) decompressedLength, (int) compressedLength, codec, ifileReadAhead,
-          ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier().toString());
-      } else if (mapOutput.getType() == Type.DISK) {
-        ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(),
-          input, compressedLength, LOG, mapOutput.getAttemptIdentifier().toString());
-      } else {
-        throw new IOException("Unknown mapOutput type while fetching shuffle data:" +
-            mapOutput.getType());
-      }
-
-      // Inform the shuffle scheduler
-      long endTime = System.currentTimeMillis();
-      // Reset retryStartTime as map task make progress if retried before.
-      retryStartTime = 0;
-
-      scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength, 
-                              endTime - startTime, mapOutput);
-      // Note successful shuffle
-      remaining.remove(srcAttemptId);
-      metrics.successFetch();
-      return null;
-    } catch (IOException ioe) {
-      if (stopped) {
-        LOG.info("Not reporting fetch failure for exception during data copy: ["
-            + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
-        cleanupCurrentConnection(true);
-        if (mapOutput != null) {
-          mapOutput.abort(); // Release resources
-        }
-        // Don't need to put back - since that's handled by the invoker
-        return EMPTY_ATTEMPT_ID_ARRAY;
-      }
-      if (shouldRetry(host, ioe)) {
-        //release mem/file handles
-        if (mapOutput != null) {
-          mapOutput.abort();
-        }
-        throw new FetcherReadTimeoutException(ioe);
-      }
-      ioErrs.increment(1);
-      if (srcAttemptId == null || mapOutput == null) {
-        LOG.info("fetcher#" + id + " failed to read map header" + 
-                 srcAttemptId + " decomp: " + 
-                 decompressedLength + ", " + compressedLength, ioe);
-        if(srcAttemptId == null) {
-          return remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
-        } else {
-          return new InputAttemptIdentifier[] {srcAttemptId};
-        }
-      }
-      
-      LOG.warn("Failed to shuffle output of " + srcAttemptId + 
-               " from " + host.getHostIdentifier(), ioe); 
-
-      // Inform the shuffle-scheduler
-      mapOutput.abort();
-      metrics.failedFetch();
-      return new InputAttemptIdentifier[] {srcAttemptId};
-    }
-
-  }
-
-  /**
-   * Check connection needs to be re-established.
-   *
-   * @param host
-   * @param ioe
-   * @return true to indicate connection retry. false otherwise.
-   * @throws IOException
-   */
-  private boolean shouldRetry(MapHost host, IOException ioe) {
-    if (!(ioe instanceof SocketTimeoutException)) {
-      return false;
-    }
-    // First time to retry.
-    long currentTime = System.currentTimeMillis();
-    if (retryStartTime == 0) {
-      retryStartTime = currentTime;
-    }
-
-    if (currentTime - retryStartTime < httpConnectionParams.getReadTimeout()) {
-      LOG.warn("Shuffle output from " + host.getHostIdentifier() +
-          " failed, retry it.");
-      //retry connecting to the host
-      return true;
-    } else {
-      // timeout, prepare to be failed.
-      LOG.warn("Timeout for copying MapOutput with retry on host " + host
-          + "after " + httpConnectionParams.getReadTimeout() + "milliseconds.");
-      return false;
-    }
-  }
-  
-  /**
-   * Do some basic verification on the input received -- Being defensive
-   * @param compressedLength
-   * @param decompressedLength
-   * @param forReduce
-   * @param remaining
-   * @param srcAttemptId
-   * @return true/false, based on if the verification succeeded or not
-   */
-  private boolean verifySanity(long compressedLength, long decompressedLength,
-      int forReduce, Set<InputAttemptIdentifier> remaining, InputAttemptIdentifier srcAttemptId) {
-    if (compressedLength < 0 || decompressedLength < 0) {
-      wrongLengthErrs.increment(1);
-      LOG.warn(getName() + " invalid lengths in map output header: id: " +
-          srcAttemptId + " len: " + compressedLength + ", decomp len: " + 
-               decompressedLength);
-      return false;
-    }
-
-    // partitionId verification. Isn't availalbe here because it is encoded into
-    // URI
-    if (forReduce != currentPartition) {
-      wrongReduceErrs.increment(1);
-      LOG.warn(getName() + " data for the wrong partition map: " + srcAttemptId + " len: "
-          + compressedLength + " decomp len: " + decompressedLength + " for partition " + forReduce
-          + ", expected partition: " + currentPartition);
-      return false;
-    }
-
-    // Sanity check
-    if (!remaining.contains(srcAttemptId)) {
-      wrongMapErrs.increment(1);
-      LOG.warn("Invalid map-output! Received output for " + srcAttemptId);
-      return false;
-    }
-    
-    return true;
-  }
-  
-  private InputAttemptIdentifier getNextRemainingAttempt() {
-    if (remaining.size() > 0) {
-      return remaining.iterator().next();
-    } else {
-      return null;
-    }
-  }
-
-  @VisibleForTesting
-  protected void setupLocalDiskFetch(MapHost host) throws InterruptedException {
-    // Get completed maps on 'host'
-    List<InputAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
-    currentPartition = host.getPartitionId();
-
-    // Sanity check to catch hosts with only 'OBSOLETE' maps,
-    // especially at the tail of large jobs
-    if (srcAttempts.size() == 0) {
-      return;
-    }
-
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Fetcher " + id + " going to fetch (local disk) from " + host + " for: "
-          + srcAttempts + ", partitionId: " + currentPartition);
-    }
-
-    // List of maps to be fetched yet
-    remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
-
-    try {
-      final Iterator<InputAttemptIdentifier> iter = remaining.iterator();
-      while (iter.hasNext()) {
-        InputAttemptIdentifier srcAttemptId = iter.next();
-        MapOutput mapOutput = null;
-        try {
-          long startTime = System.currentTimeMillis();
-          Path filename = getShuffleInputFileName(srcAttemptId.getPathComponent(), null);
-
-          TezIndexRecord indexRecord = getIndexRecord(srcAttemptId.getPathComponent(),
-              currentPartition);
-
-          mapOutput = getMapOutputForDirectDiskFetch(srcAttemptId, filename, indexRecord);
-          long endTime = System.currentTimeMillis();
-          scheduler.copySucceeded(srcAttemptId, host, indexRecord.getPartLength(),
-              indexRecord.getRawLength(), (endTime - startTime), mapOutput);
-          iter.remove();
-          metrics.successFetch();
-        } catch (IOException e) {
-          if (mapOutput != null) {
-            mapOutput.abort();
-          }
-          metrics.failedFetch();
-          ioErrs.increment(1);
-          scheduler.copyFailed(srcAttemptId, host, true, false);
-          LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " +
-              host.getHostIdentifier(), e);
-        }
-      }
-    } finally {
-      putBackRemainingMapOutputs(host);
-    }
-
-  }
-
-  @VisibleForTesting
-  protected Path getShuffleInputFileName(String pathComponent, String suffix)
-      throws IOException {
-    LocalDirAllocator localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
-    suffix = suffix != null ? suffix : "";
-
-    String pathFromLocalDir = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR +
-        pathComponent + Path.SEPARATOR + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + suffix;
-
-    return localDirAllocator.getLocalPathToRead(pathFromLocalDir.toString(), conf);
-  }
-
-  @VisibleForTesting
-  protected TezIndexRecord getIndexRecord(String pathComponent, int partitionId)
-      throws IOException {
-    Path indexFile = getShuffleInputFileName(pathComponent,
-        Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
-    TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
-    return spillRecord.getIndex(partitionId);
-  }
-
-  @VisibleForTesting
-  protected MapOutput getMapOutputForDirectDiskFetch(InputAttemptIdentifier srcAttemptId,
-                                                     Path filename, TezIndexRecord indexRecord)
-      throws IOException {
-    return MapOutput.createLocalDiskMapOutput(srcAttemptId, merger, filename,
-        indexRecord.getStartOffset(), indexRecord.getPartLength(), true);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
deleted file mode 100644
index 1492da4..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.common.shuffle.impl;
-
-import java.io.DataInput;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.sort.impl.IFile;
-import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
-
-/**
- * <code>IFile.InMemoryReader</code> to read map-outputs present in-memory.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class InMemoryReader extends Reader {
-
-  private final InputAttemptIdentifier taskAttemptId;
-  private final MergeManager merger;
-  DataInputBuffer memDataIn = new DataInputBuffer();
-  private int start;
-  private int length;
-  private int originalKeyPos;
-
-  public InMemoryReader(MergeManager merger,
-      InputAttemptIdentifier taskAttemptId, byte[] data, int start,
-      int length)
-      throws IOException {
-    super(null, length - start, null, null, null, false, 0, -1);
-    this.merger = merger;
-    this.taskAttemptId = taskAttemptId;
-
-    buffer = data;
-    bufferSize = (int) length;
-    memDataIn.reset(buffer, start, length);
-    this.start = start;
-    this.length = length;
-  }
-
-  @Override
-  public void reset(int offset) {
-    memDataIn.reset(buffer, start + offset, length);
-    bytesRead = offset;
-    eof = false;
-  }
-
-  @Override
-  public long getPosition() throws IOException {
-    // InMemoryReader does not initialize streams like Reader, so in.getPos()
-    // would not work. Instead, return the number of uncompressed bytes read,
-    // which will be correct since in-memory data is not compressed.
-    return bytesRead;
-  }
-
-  @Override
-  public long getLength() {
-    return length;
-  }
-
-  private void dumpOnError() {
-    File dumpFile = new File("../output/" + taskAttemptId + ".dump");
-    System.err.println("Dumping corrupt map-output of " + taskAttemptId +
-                       " to " + dumpFile.getAbsolutePath());
-    try {
-      FileOutputStream fos = new FileOutputStream(dumpFile);
-      fos.write(buffer, 0, bufferSize);
-      fos.close();
-    } catch (IOException ioe) {
-      System.err.println("Failed to dump map-output of " + taskAttemptId);
-    }
-  }
-
-  protected void readKeyValueLength(DataInput dIn) throws IOException {
-    super.readKeyValueLength(dIn);
-    if (currentKeyLength != IFile.RLE_MARKER) {
-      originalKeyPos = memDataIn.getPosition();
-    }
-  }
-
-  public KeyState readRawKey(DataInputBuffer key) throws IOException {
-    try {
-      if (!positionToNextRecord(memDataIn)) {
-        return KeyState.NO_KEY;
-      }
-      // Setup the key
-      int pos = memDataIn.getPosition();
-      byte[] data = memDataIn.getData();
-      if (currentKeyLength == IFile.RLE_MARKER) {
-        // get key length from original key
-        key.reset(data, originalKeyPos, originalKeyLength);
-        return KeyState.SAME_KEY;
-      }
-      key.reset(data, pos, currentKeyLength);
-      // Position for the next value
-      long skipped = memDataIn.skip(currentKeyLength);
-      if (skipped != currentKeyLength) {
-        throw new IOException("Rec# " + recNo +
-            ": Failed to skip past key of length: " +
-            currentKeyLength);
-      }
-      bytesRead += currentKeyLength;
-      return KeyState.NEW_KEY;
-    } catch (IOException ioe) {
-      dumpOnError();
-      throw ioe;
-    }
-  }
-
-  public void nextRawValue(DataInputBuffer value) throws IOException {
-    try {
-      int pos = memDataIn.getPosition();
-      byte[] data = memDataIn.getData();
-      value.reset(data, pos, currentValueLength);
-
-      // Position for the next record
-      long skipped = memDataIn.skip(currentValueLength);
-      if (skipped != currentValueLength) {
-        throw new IOException("Rec# " + recNo +
-            ": Failed to skip past value of length: " +
-            currentValueLength);
-      }
-      // Record the byte
-      bytesRead += currentValueLength;
-      ++recNo;
-    } catch (IOException ioe) {
-      dumpOnError();
-      throw ioe;
-    }
-  }
-
-  public void close() {
-    // Release
-    dataIn = null;
-    buffer = null;
-    // Inform the MergeManager
-    if (merger != null) {
-      merger.unreserve(bufferSize);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryWriter.java
deleted file mode 100644
index d18d363..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryWriter.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.common.shuffle.impl;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.tez.runtime.library.common.sort.impl.IFile;
-import org.apache.tez.runtime.library.common.sort.impl.IFileOutputStream;
-import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class InMemoryWriter extends Writer {
-  private static final Log LOG = LogFactory.getLog(InMemoryWriter.class);
-
-  // TODO Verify and fix counters if required.
-
-  public InMemoryWriter(BoundedByteArrayOutputStream arrayStream) {
-    super(null, null);
-    this.out =
-      new DataOutputStream(new IFileOutputStream(arrayStream));
-  }
-
-  public void append(Object key, Object value) throws IOException {
-    throw new UnsupportedOperationException
-    ("InMemoryWriter.append(K key, V value");
-  }
-
-  public void close() throws IOException {
-    // Write EOF_MARKER for key/value length
-    WritableUtils.writeVInt(out, IFile.EOF_MARKER);
-    WritableUtils.writeVInt(out, IFile.EOF_MARKER);
-
-    // Close the stream
-    out.close();
-    out = null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapHost.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapHost.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapHost.java
deleted file mode 100644
index aa7309a..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapHost.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.common.shuffle.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-
-@Private
-class MapHost {
-  
-  public static enum State {
-    IDLE,               // No map outputs available
-    BUSY,               // Map outputs are being fetched
-    PENDING,            // Known map outputs which need to be fetched
-    PENALIZED           // Host penalized due to shuffle failures
-  }
-  
-  private State state = State.IDLE;
-  private final String hostIdentifier;
-  private final int partitionId;
-  private final String baseUrl;
-  private final String identifier;
-  // Tracks attempt IDs
-  private List<InputAttemptIdentifier> maps = new ArrayList<InputAttemptIdentifier>();
-  
-  public MapHost(int partitionId, String hostPort, String baseUrl) {
-    this.partitionId = partitionId;
-    this.hostIdentifier = hostPort;
-    this.baseUrl = baseUrl;
-    this.identifier = createIdentifier(hostPort, partitionId);
-  }
-  
-  public static String createIdentifier(String hostName, int partitionId) {
-    return hostName + ":" + Integer.toString(partitionId);
-  }
-  
-  public String getIdentifier() {
-    return identifier;
-  }
-  
-  public int getPartitionId() {
-    return partitionId;
-  }
-
-  public State getState() {
-    return state;
-  }
-
-  public String getHostIdentifier() {
-    return hostIdentifier;
-  }
-
-  public String getBaseUrl() {
-    return baseUrl;
-  }
-
-  public synchronized void addKnownMap(InputAttemptIdentifier srcAttempt) {
-    maps.add(srcAttempt);
-    if (state == State.IDLE) {
-      state = State.PENDING;
-    }
-  }
-
-  public synchronized List<InputAttemptIdentifier> getAndClearKnownMaps() {
-    List<InputAttemptIdentifier> currentKnownMaps = maps;
-    maps = new ArrayList<InputAttemptIdentifier>();
-    return currentKnownMaps;
-  }
-  
-  public synchronized void markBusy() {
-    state = State.BUSY;
-  }
-  
-  public synchronized void markPenalized() {
-    state = State.PENALIZED;
-  }
-  
-  public synchronized int getNumKnownMapOutputs() {
-    return maps.size();
-  }
-
-  /**
-   * Called when the node is done with its penalty or done copying.
-   * @return the host's new state
-   */
-  public synchronized State markAvailable() {
-    if (maps.isEmpty()) {
-      state = State.IDLE;
-    } else {
-      state = State.PENDING;
-    }
-    return state;
-  }
-  
-  @Override
-  public String toString() {
-    return hostIdentifier;
-  }
-  
-  /**
-   * Mark the host as penalized
-   */
-  public synchronized void penalize() {
-    state = State.PENALIZED;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java
deleted file mode 100644
index 389aae9..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.common.shuffle.impl;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Comparator;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.hadoop.io.FileChunk;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
-
-
-class MapOutput {
-  private static final Log LOG = LogFactory.getLog(MapOutput.class);
-  private static AtomicInteger ID = new AtomicInteger(0);
-  
-  public static enum Type {
-    WAIT,
-    MEMORY,
-    DISK,
-    DISK_DIRECT
-  }
-
-  private final int id;
-  private final Type type;
-  private InputAttemptIdentifier attemptIdentifier;
-  private final long size;
-
-  private final boolean primaryMapOutput;
-  private final MergeManager merger;
-
-  // MEMORY
-  private final byte[] memory;
-  private BoundedByteArrayOutputStream byteStream;
-
-  // DISK
-  private final FileSystem localFS;
-  private final Path tmpOutputPath;
-  private final FileChunk outputPath;
-  private OutputStream disk;
-
-  private MapOutput(Type type, InputAttemptIdentifier attemptIdentifier, MergeManager merger,
-                    long size, Path outputPath, long offset, boolean primaryMapOutput,
-                    FileSystem fs, Path tmpOutputPath) {
-    this.id = ID.incrementAndGet();
-    this.type = type;
-    this.attemptIdentifier = attemptIdentifier;
-    this.merger = merger;
-    this.primaryMapOutput = primaryMapOutput;
-
-    this.localFS = fs;
-    this.size = size;
-
-    // Other type specific values
-
-    if (type == Type.MEMORY) {
-      // since we are passing an int from createMemoryMapOutput, its safe to cast to int
-      this.byteStream = new BoundedByteArrayOutputStream((int)size);
-      this.memory = byteStream.getBuffer();
-    } else {
-      this.byteStream = null;
-      this.memory = null;
-    }
-
-    this.tmpOutputPath = tmpOutputPath;
-    this.disk = null;
-
-    if (type == Type.DISK || type == Type.DISK_DIRECT) {
-      boolean preserve = (type == Type.DISK_DIRECT); // type disk are temp files.
-      this.outputPath = new FileChunk(outputPath, offset, size, preserve);
-    } else {
-      this.outputPath = null;
-    }
-
-  }
-
-  public static MapOutput createDiskMapOutput(InputAttemptIdentifier attemptIdentifier,
-                                              MergeManager merger, long size, Configuration conf,
-                                              int fetcher, boolean primaryMapOutput,
-                                              TezTaskOutputFiles mapOutputFile) throws
-      IOException {
-    FileSystem fs = FileSystem.getLocal(conf);
-    Path outputpath = mapOutputFile.getInputFileForWrite(
-        attemptIdentifier.getInputIdentifier().getInputIndex(), size);
-    Path tmpOuputPath = outputpath.suffix(String.valueOf(fetcher));
-    long offset = 0;
-
-    MapOutput mapOutput = new MapOutput(Type.DISK, attemptIdentifier, merger, size, outputpath, offset,
-        primaryMapOutput, fs, tmpOuputPath);
-    mapOutput.disk = mapOutput.localFS.create(tmpOuputPath);
-
-    return mapOutput;
-  }
-
-  public static MapOutput createLocalDiskMapOutput(InputAttemptIdentifier attemptIdentifier,
-                                                   MergeManager merger, Path path,  long offset,
-                                                   long size, boolean primaryMapOutput)  {
-    return new MapOutput(Type.DISK_DIRECT, attemptIdentifier, merger, size, path, offset,
-        primaryMapOutput, null, null);
-  }
-
-  public static MapOutput createMemoryMapOutput(InputAttemptIdentifier attemptIdentifier,
-                                                MergeManager merger, int size,
-                                                boolean primaryMapOutput)  {
-    return new MapOutput(Type.MEMORY, attemptIdentifier, merger, size, null, -1, primaryMapOutput,
-        null, null);
-  }
-
-  public static MapOutput createWaitMapOutput(InputAttemptIdentifier attemptIdentifier) {
-    return new MapOutput(Type.WAIT, attemptIdentifier, null, -1, null, -1, false, null, null);
-  }
-
-  public boolean isPrimaryMapOutput() {
-    return primaryMapOutput;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj instanceof MapOutput) {
-      return id == ((MapOutput)obj).id;
-    }
-    return false;
-  }
-
-  @Override
-  public int hashCode() {
-    return id;
-  }
-
-  public FileChunk getOutputPath() {
-    return outputPath;
-  }
-
-  public byte[] getMemory() {
-    return memory;
-  }
-
-  public BoundedByteArrayOutputStream getArrayStream() {
-    return byteStream;
-  }
-  
-  public OutputStream getDisk() {
-    return disk;
-  }
-
-  public InputAttemptIdentifier getAttemptIdentifier() {
-    return this.attemptIdentifier;
-  }
-
-  public Type getType() {
-    return type;
-  }
-
-  public long getSize() {
-    return size;
-  }
-
-  public void commit() throws IOException {
-    if (type == Type.MEMORY) {
-      merger.closeInMemoryFile(this);
-    } else if (type == Type.DISK) {
-      localFS.rename(tmpOutputPath, outputPath.getPath());
-      merger.closeOnDiskFile(outputPath);
-    } else if (type == Type.DISK_DIRECT) {
-      merger.closeOnDiskFile(outputPath);
-    } else {
-      throw new IOException("Cannot commit MapOutput of type WAIT!");
-    }
-  }
-  
-  public void abort() {
-    if (type == Type.MEMORY) {
-      merger.unreserve(memory.length);
-    } else if (type == Type.DISK) {
-      try {
-        localFS.delete(tmpOutputPath, false);
-      } catch (IOException ie) {
-        LOG.info("failure to clean up " + tmpOutputPath, ie);
-      }
-    } else if (type == Type.DISK_DIRECT) { //nothing to do.
-    } else {
-      throw new IllegalArgumentException
-                   ("Cannot commit MapOutput with of type WAIT!");
-    }
-  }
-  
-  public String toString() {
-    return "MapOutput( AttemptIdentifier: " + attemptIdentifier + ", Type: " + type + ")";
-  }
-  
-  public static class MapOutputComparator 
-  implements Comparator<MapOutput> {
-    public int compare(MapOutput o1, MapOutput o2) {
-      if (o1.id == o2.id) { 
-        return 0;
-      }
-      
-      if (o1.size < o2.size) {
-        return -1;
-      } else if (o1.size > o2.size) {
-        return 1;
-      }
-      
-      if (o1.id < o2.id) {
-        return -1;
-      } else {
-        return 1;
-      }
-    }
-  }
-}


Mime
View raw message