tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-791. Remove ShuffleHandler and related classes from tez-library. (sseth)
Date Mon, 03 Feb 2014 22:31:10 GMT
Updated Branches:
  refs/heads/master 76175a283 -> 3ab7310f7


TEZ-791. Remove ShuffleHandler and related classes from tez-library.
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/3ab7310f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/3ab7310f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/3ab7310f

Branch: refs/heads/master
Commit: 3ab7310f794c316f68df8d3291e529e80efc9c2c
Parents: 76175a2
Author: Siddharth Seth <sseth@apache.org>
Authored: Mon Feb 3 14:30:43 2014 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Mon Feb 3 14:30:43 2014 -0800

----------------------------------------------------------------------
 pom.xml                                         |   5 -
 .../apache/tez/dag/api/TezConfiguration.java    |   7 +
 .../app/rm/container/AMContainerHelpers.java    |  28 +-
 tez-runtime-library/pom.xml                     |   4 -
 .../input/BroadcastShuffleManager.java          |   4 +-
 .../library/common/shuffle/impl/Shuffle.java    |   4 +-
 .../common/shuffle/server/ShuffleHandler.java   | 572 -------------------
 .../sort/impl/dflt/InMemoryShuffleSorter.java   | 126 ----
 .../sort/impl/dflt/SortBufferInputStream.java   | 271 ---------
 .../library/output/InMemorySortedOutput.java    |  81 ---
 10 files changed, 36 insertions(+), 1066 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3ab7310f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ba5a3b9..62248bd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -253,11 +253,6 @@
         <version>1.0.4.1</version>
       </dependency>
       <dependency>
-        <groupId>io.netty</groupId>
-        <artifactId>netty</artifactId>
-        <version>3.9.0.Final</version>
-      </dependency>
-      <dependency>
         <groupId>com.google.guava</groupId>
         <artifactId>guava</artifactId>
         <version>15.0</version>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3ab7310f/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 18fac77..c1154a1 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.api;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 
 public class TezConfiguration extends Configuration {
@@ -283,4 +284,10 @@ public class TezConfiguration extends Configuration {
    */
   public static final String TEZ_PROFILE_JVM_OPTS = TEZ_PREFIX + "profile.jvm.opts";
 
+  /**
+   * The service id for the NodeManager plugin used to share intermediate data
+   * between vertices.
+   */
+  @Private
+  public static final String TEZ_SHUFFLE_HANDLER_SERVICE_ID = "mapreduce_shuffle";
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3ab7310f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 7a74753..d65bbf1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -42,12 +43,13 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.utils.TezRuntimeChildJVM;
-import org.apache.tez.runtime.library.common.shuffle.server.ShuffleHandler;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -120,8 +122,8 @@ public class AMContainerHelpers {
 
       // Add shuffle token
       LOG.info("Putting shuffle token in serviceData");
-      serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
-          ShuffleHandler.serializeServiceData(TokenCache.getSessionToken(containerCredentials)));
+      serviceData.put(TezConfiguration.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+          serializeServiceData(TokenCache.getSessionToken(containerCredentials)));
     } catch (IOException e) {
       throw new TezUncheckedException(e);
     }
@@ -198,5 +200,25 @@ public class AMContainerHelpers {
 
     return container;
   }
+  
+  /**
+   * A helper function to serialize the JobTokenIdentifier to be sent to the
+   * ShuffleHandler as ServiceData.
+   * 
+   * *NOTE* This is a copy of what is done by the MapReduce ShuffleHandler. Not using that directly
+   * to avoid a dependency on mapreduce.
+   * 
+   * @param jobToken
+   *          the job token to be used for authentication of shuffle data
+   *          requests.
+   * @return the serialized version of the jobToken.
+   */
+  private static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> jobToken)
+      throws IOException {
+    // TODO these bytes should be versioned
+    DataOutputBuffer jobToken_dob = new DataOutputBuffer();
+    jobToken.write(jobToken_dob);
+    return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength());
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3ab7310f/tez-runtime-library/pom.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/pom.xml b/tez-runtime-library/pom.xml
index e895e8e..c56a8b2 100644
--- a/tez-runtime-library/pom.xml
+++ b/tez-runtime-library/pom.xml
@@ -51,10 +51,6 @@
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
     </dependency>
-    <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty</artifactId>
-    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3ab7310f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index d6af464..47eaaf6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezInputContext;
@@ -54,7 +55,6 @@ import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.InputIdentifier;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
-import org.apache.tez.runtime.library.common.shuffle.server.ShuffleHandler;
 import org.apache.tez.runtime.library.shuffle.common.FetchResult;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
@@ -193,7 +193,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
     
     this.shuffleSecret = ShuffleUtils
         .getJobTokenSecretFromTokenBytes(inputContext
-            .getServiceConsumerMetaData(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID));
+            .getServiceConsumerMetaData(TezConfiguration.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
     
     this.connectionTimeout = conf.getInt(
         TezJobConfig.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3ab7310f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
index 766ffea..80f6627 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
@@ -39,13 +39,13 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.combine.Combiner;
-import org.apache.tez.runtime.library.common.shuffle.server.ShuffleHandler;
 import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 
@@ -86,7 +86,7 @@ public class Shuffle implements ExceptionReporter {
     
     this.jobTokenSecret = ShuffleUtils
         .getJobTokenSecretFromTokenBytes(inputContext
-            .getServiceConsumerMetaData(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID));
+            .getServiceConsumerMetaData(TezConfiguration.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
     
     if (ConfigUtils.isIntermediateInputCompressed(conf)) {
       Class<? extends CompressionCodec> codecClass =

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3ab7310f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/server/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/server/ShuffleHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/server/ShuffleHandler.java
deleted file mode 100644
index aecc9e4..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/server/ShuffleHandler.java
+++ /dev/null
@@ -1,572 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.runtime.library.common.shuffle.server;
-
-import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import javax.crypto.SecretKey;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.metrics2.MetricsSystem;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MutableCounterInt;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
-import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
-import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
-import org.apache.hadoop.yarn.server.api.AuxiliaryService;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.security.JobTokenIdentifier;
-import org.apache.tez.runtime.api.TezOutputContext;
-import org.apache.tez.runtime.library.common.security.JobTokenSecretManager;
-import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
-import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleHeader;
-import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedStream;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-import org.jboss.netty.util.CharsetUtil;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class ShuffleHandler extends AuxiliaryService {
-
-  private static final Log LOG = LogFactory.getLog(ShuffleHandler.class);
-  
-  public static final String SHUFFLE_MANAGE_OS_CACHE = "mapreduce.shuffle.manage.os.cache";
-  public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
-
-  public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes";
-  public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
-
-  private int port;
-  private ChannelFactory selector;
-  private final ChannelGroup accepted = new DefaultChannelGroup();
-  private HttpPipelineFactory pipelineFact;
-  private int sslFileBufferSize;
-
-  public static final String MAPREDUCE_SHUFFLE_SERVICEID =
-      "mapreduce_shuffle";
-
-  private static final Map<String,String> userRsrc =
-    new ConcurrentHashMap<String,String>();
-  private static final JobTokenSecretManager secretManager =
-    new JobTokenSecretManager();
-  private SecretKey tokenSecret;
-
-  public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port";
-  public static final int DEFAULT_SHUFFLE_PORT = 8080;
-
-  public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
-    "mapreduce.shuffle.ssl.file.buffer.size";
-
-  public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
-
-  private ExternalSorter sorter;
-  
-  @Metrics(about="Shuffle output metrics", context="mapred")
-  static class ShuffleMetrics implements ChannelFutureListener {
-    @Metric("Shuffle output in bytes")
-        MutableCounterLong shuffleOutputBytes;
-    @Metric("# of failed shuffle outputs")
-        MutableCounterInt shuffleOutputsFailed;
-    @Metric("# of succeeeded shuffle outputs")
-        MutableCounterInt shuffleOutputsOK;
-    @Metric("# of current shuffle connections")
-        MutableGaugeInt shuffleConnections;
-
-    @Override
-    public void operationComplete(ChannelFuture future) throws Exception {
-      if (future.isSuccess()) {
-        shuffleOutputsOK.incr();
-      } else {
-        shuffleOutputsFailed.incr();
-      }
-      shuffleConnections.decr();
-    }
-  }
-
-  final ShuffleMetrics metrics;
-
-  ShuffleHandler(MetricsSystem ms) {
-    super("httpshuffle");
-    metrics = ms.register(new ShuffleMetrics());
-  }
-
-  public ShuffleHandler(ExternalSorter sorter) {
-    this(DefaultMetricsSystem.instance());
-    this.sorter = sorter;
-  }
-
-  /**
-   * Serialize the shuffle port into a ByteBuffer for use later on.
-   * @param port the port to be sent to the ApplciationMaster
-   * @return the serialized form of the port.
-   */
-  public static ByteBuffer serializeMetaData(int port) throws IOException {
-    //TODO these bytes should be versioned
-    DataOutputBuffer port_dob = new DataOutputBuffer();
-    port_dob.writeInt(port);
-    return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
-  }
-
-  /**
-   * A helper function to deserialize the metadata returned by ShuffleHandler.
-   * @param meta the metadata returned by the ShuffleHandler
-   * @return the port the Shuffle Handler is listening on to serve shuffle data.
-   */
-  public static int deserializeMetaData(ByteBuffer meta) throws IOException {
-    //TODO this should be returning a class not just an int
-    DataInputByteBuffer in = new DataInputByteBuffer();
-    in.reset(meta);
-    int port = in.readInt();
-    return port;
-  }
-
-  /**
-   * A helper function to serialize the JobTokenIdentifier to be sent to the
-   * ShuffleHandler as ServiceData.
-   * @param jobToken the job token to be used for authentication of
-   * shuffle data requests.
-   * @return the serialized version of the jobToken.
-   */
-  public static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> jobToken) throws IOException {
-    //TODO these bytes should be versioned
-    DataOutputBuffer jobToken_dob = new DataOutputBuffer();
-    jobToken.write(jobToken_dob);
-    return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength());
-  }
-
-  static Token<JobTokenIdentifier> deserializeServiceData(ByteBuffer secret) throws IOException {
-    DataInputByteBuffer in = new DataInputByteBuffer();
-    in.reset(secret);
-    Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
-    jt.readFields(in);
-    return jt;
-  }
-
-  
-  @Override
-  public void initializeApplication(
-      ApplicationInitializationContext initAppContext) {
-    // TODO these bytes should be versioned
-    try {
-      String user = initAppContext.getUser();
-      ApplicationId appId = initAppContext.getApplicationId();
-      ByteBuffer secret = initAppContext.getApplicationDataForService();
-      Token<JobTokenIdentifier> jt = deserializeServiceData(secret);
-      // TODO: Once SHuffle is out of NM, this can use MR APIs
-      userRsrc.put(appId.toString(), user);
-      LOG.info("Added token for " + appId.toString());
-      secretManager.addTokenForJob(appId.toString(), jt);
-    } catch (IOException e) {
-      LOG.error("Error during initApp", e);
-      // TODO add API to AuxiliaryServices to report failures
-    }
-  }
-
-  @Override
-  public void stopApplication(ApplicationTerminationContext context) {
-    ApplicationId appId = context.getApplicationId();
-    secretManager.removeTokenForJob(appId.toString());
-    userRsrc.remove(appId.toString());
-  }
-
-  public void initialize(TezOutputContext outputContext, Configuration conf) throws IOException {
-    this.init(new Configuration(conf));
-    tokenSecret = ShuffleUtils.getJobTokenSecretFromTokenBytes(outputContext.getServiceConsumerMetaData(MAPREDUCE_SHUFFLE_SERVICEID));
-  }
-
-  @Override
-  public synchronized void serviceInit(Configuration conf) {
-    ThreadFactory bossFactory = new ThreadFactoryBuilder()
-      .setNameFormat("ShuffleHandler Netty Boss #%d")
-      .build();
-    ThreadFactory workerFactory = new ThreadFactoryBuilder()
-      .setNameFormat("ShuffleHandler Netty Worker #%d")
-      .build();
-    
-    selector = new NioServerSocketChannelFactory(
-        Executors.newCachedThreadPool(bossFactory),
-        Executors.newCachedThreadPool(workerFactory));    
-  }
-
-  // TODO change AbstractService to throw InterruptedException
-  @Override
-  public synchronized void serviceStart() {
-    Configuration conf = getConfig();
-    ServerBootstrap bootstrap = new ServerBootstrap(selector);
-    try {
-      pipelineFact = new HttpPipelineFactory(conf);
-    } catch (Exception ex) {
-      throw new RuntimeException(ex);
-    }
-    bootstrap.setPipelineFactory(pipelineFact);
-    // Let OS pick the port
-    Channel ch = bootstrap.bind(new InetSocketAddress(0));
-    accepted.add(ch);
-    port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
-    conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port));
-    pipelineFact.SHUFFLE.setPort(port);
-    LOG.info(getName() + " listening on port " + port);
-
-    sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
-                                    DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
-  }
-
-  @Override
-  public synchronized void serviceStop() {
-    accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
-    ServerBootstrap bootstrap = new ServerBootstrap(selector);
-    bootstrap.releaseExternalResources();
-    pipelineFact.destroy();
-  }
-
-  @Override
-  public synchronized ByteBuffer getMetaData() {
-    try {
-      return serializeMetaData(port); 
-    } catch (IOException e) {
-      LOG.error("Error during getMeta", e);
-      // TODO add API to AuxiliaryServices to report failures
-      return null;
-    }
-  }
-
-  class HttpPipelineFactory implements ChannelPipelineFactory {
-
-    final Shuffle SHUFFLE;
-    private SSLFactory sslFactory;
-
-    public HttpPipelineFactory(Configuration conf) throws Exception {
-      SHUFFLE = new Shuffle(conf);
-      if (conf.getBoolean(TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
-              TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_ENABLE_SSL)) {
-        sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
-        sslFactory.init();
-      }
-    }
-
-    public void destroy() {
-      if (sslFactory != null) {
-        sslFactory.destroy();
-      }
-    }
-
-    @Override
-    public ChannelPipeline getPipeline() throws Exception {
-      ChannelPipeline pipeline = Channels.pipeline();
-      if (sslFactory != null) {
-        pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
-      }
-      pipeline.addLast("decoder", new HttpRequestDecoder());
-      pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
-      pipeline.addLast("encoder", new HttpResponseEncoder());
-      pipeline.addLast("chunking", new ChunkedWriteHandler());
-      pipeline.addLast("shuffle", SHUFFLE);
-      return pipeline;
-      // TODO factor security manager into pipeline
-      // TODO factor out encode/decode to permit binary shuffle
-      // TODO factor out decode of index to permit alt. models
-    }
-
-  }
-
-  class Shuffle extends SimpleChannelUpstreamHandler {
-
-    private final Configuration conf;
-    private int port;
-
-    public Shuffle(Configuration conf) {
-      this.conf = conf;
-      this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
-    }
-    
-    public void setPort(int port) {
-      this.port = port;
-    }
-
-    private List<String> splitMaps(List<String> mapq) {
-      if (null == mapq) {
-        return null;
-      }
-      final List<String> ret = new ArrayList<String>();
-      for (String s : mapq) {
-        Collections.addAll(ret, s.split(","));
-      }
-      return ret;
-    }
-
-    @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
-        throws Exception {
-      HttpRequest request = (HttpRequest) evt.getMessage();
-      if (request.getMethod() != GET) {
-          sendError(ctx, METHOD_NOT_ALLOWED);
-          return;
-      }
-      // Check whether the shuffle version is compatible
-      if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
-          request.getHeader(ShuffleHeader.HTTP_HEADER_NAME))
-          || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
-              request.getHeader(ShuffleHeader.HTTP_HEADER_VERSION))) {
-        sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
-      }
-      final Map<String,List<String>> q =
-        new QueryStringDecoder(request.getUri()).getParameters();
-      final List<String> mapIds = splitMaps(q.get("map"));
-      final List<String> reduceQ = q.get("reduce");
-      final List<String> jobQ = q.get("job");
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("RECV: " + request.getUri() +
-            "\n  mapId: " + mapIds +
-            "\n  reduceId: " + reduceQ +
-            "\n  jobId: " + jobQ);
-      }
-
-      if (mapIds == null || reduceQ == null || jobQ == null) {
-        sendError(ctx, "Required param job, map and reduce", BAD_REQUEST);
-        return;
-      }
-      if (reduceQ.size() != 1 || jobQ.size() != 1) {
-        sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST);
-        return;
-      }
-      int reduceId;
-      String jobId;
-      try {
-        reduceId = Integer.parseInt(reduceQ.get(0));
-        jobId = jobQ.get(0);
-      } catch (NumberFormatException e) {
-        sendError(ctx, "Bad reduce parameter", BAD_REQUEST);
-        return;
-      } catch (IllegalArgumentException e) {
-        sendError(ctx, "Bad job parameter", BAD_REQUEST);
-        return;
-      }
-
-      final String reqUri = request.getUri();
-      if (null == reqUri) {
-        // TODO? add upstream?
-        sendError(ctx, FORBIDDEN);
-        return;
-      }
-      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
-      try {
-        verifyRequest(jobId, ctx, request, response,
-            new URL("http", "", this.port, reqUri));
-      } catch (IOException e) {
-        LOG.warn("Shuffle failure ", e);
-        sendError(ctx, e.getMessage(), UNAUTHORIZED);
-        return;
-      }
-
-      Channel ch = evt.getChannel();
-      ch.write(response);
-      // TODO refactor the following into the pipeline
-      ChannelFuture lastMap = null;
-      for (String mapId : mapIds) {
-        try {
-          // TODO: Error handling - validate mapId via TezTaskAttemptId.forName
-          
-          // TODO NEWTEZ Fix this. TaskAttemptId is no longer valid. mapId validation will not work anymore.
-//          if (!mapId.equals(sorter.getTaskAttemptId().toString())) {
-//            String errorMessage =
-//                "Illegal shuffle request mapId: " + mapId
-//                    + " while actual mapId is " + sorter.getTaskAttemptId(); 
-//            LOG.warn(errorMessage);
-//            sendError(ctx, errorMessage, BAD_REQUEST);
-//            return;
-//          }
-
-          lastMap =
-            sendMapOutput(ctx, ch, userRsrc.get(jobId), jobId, mapId, reduceId);
-          if (null == lastMap) {
-            sendError(ctx, NOT_FOUND);
-            return;
-          }
-        } catch (IOException e) {
-          LOG.error("Shuffle error ", e);
-          sendError(ctx, e.getMessage(), INTERNAL_SERVER_ERROR);
-          return;
-        }
-      }
-      lastMap.addListener(metrics);
-      lastMap.addListener(ChannelFutureListener.CLOSE);
-    }
-
-    private void verifyRequest(String appid, ChannelHandlerContext ctx,
-        HttpRequest request, HttpResponse response, URL requestUri)
-        throws IOException {
-      if (null == tokenSecret) {
-        LOG.info("Request for unknown token " + appid);
-        throw new IOException("could not find jobid");
-      }
-      // string to encrypt
-      String enc_str = SecureShuffleUtils.buildMsgFrom(requestUri);
-      // hash from the fetcher
-      String urlHashStr =
-        request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
-      if (urlHashStr == null) {
-        LOG.info("Missing header hash for " + appid);
-        throw new IOException("fetcher cannot be authenticated");
-      }
-      if (LOG.isDebugEnabled()) {
-        int len = urlHashStr.length();
-        LOG.debug("verifying request. enc_str=" + enc_str + "; hash=..." +
-            urlHashStr.substring(len-len/2, len-1));
-      }
-      // verify - throws exception
-      SecureShuffleUtils.verifyReply(urlHashStr, enc_str, tokenSecret);
-      // verification passed - encode the reply
-      String reply =
-        SecureShuffleUtils.generateHash(urlHashStr.getBytes(), tokenSecret);
-      response.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
-      addVersionToHeader(response);
-      if (LOG.isDebugEnabled()) {
-        int len = reply.length();
-        LOG.debug("Fetcher request verfied. enc_str=" + enc_str + ";reply=" +
-            reply.substring(len-len/2, len-1));
-      }
-    }
-
-    protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch,
-        String user, String jobId, String mapId, int reduce)
-        throws IOException {
-      final ShuffleHeader header = sorter.getShuffleHeader(reduce);
-      final DataOutputBuffer dob = new DataOutputBuffer();
-      header.write(dob);
-      ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
-
-      ChannelFuture writeFuture =
-          ch.write(
-              new ChunkedStream(
-                  sorter.getSortedStream(reduce), sslFileBufferSize
-                  )
-              );
-      metrics.shuffleConnections.incr();
-      metrics.shuffleOutputBytes.incr(header.getCompressedLength()); // optimistic
-      return writeFuture;
-    }
-
-    private void sendError(ChannelHandlerContext ctx,
-        HttpResponseStatus status) {
-      sendError(ctx, "", status);
-    }
-
-    private void sendError(ChannelHandlerContext ctx, String message,
-        HttpResponseStatus status) {
-      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
-      response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
-      addVersionToHeader(response);
-      response.setContent(
-        ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
-      // Close the connection as soon as the error message is sent.
-      ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
-    }
-    
-    private void addVersionToHeader(HttpResponse response) {
-      // Put shuffle version into http header
-      response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
-      response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);      
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
-        throws Exception {
-      Channel ch = e.getChannel();
-      Throwable cause = e.getCause();
-      if (cause instanceof TooLongFrameException) {
-        sendError(ctx, BAD_REQUEST);
-        return;
-      }
-
-      LOG.error("Shuffle error: ", cause);
-      if (ch.isConnected()) {
-        LOG.error("Shuffle error " + e);
-        sendError(ctx, INTERNAL_SERVER_ERROR);
-      }
-    }
-
-  }
-
-  public int getPort() {
-    return port;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3ab7310f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/InMemoryShuffleSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/InMemoryShuffleSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/InMemoryShuffleSorter.java
deleted file mode 100644
index 92ae916..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/InMemoryShuffleSorter.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.runtime.library.common.sort.impl.dflt;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.IntBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.tez.runtime.api.TezOutputContext;
-import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleHeader;
-import org.apache.tez.runtime.library.common.shuffle.server.ShuffleHandler;
-import org.apache.tez.runtime.library.common.sort.impl.IFile;
-
-public class InMemoryShuffleSorter extends DefaultSorter {
-
-  private static final Log LOG = LogFactory.getLog(InMemoryShuffleSorter.class);
-  
-  static final int IFILE_EOF_LENGTH = 
-      2 * WritableUtils.getVIntSize(IFile.EOF_MARKER);
-  static final int IFILE_CHECKSUM_LENGTH = DataChecksum.Type.CRC32.size;
-  
-  private List<Integer> spillIndices = new ArrayList<Integer>();
-  private List<ShuffleHeader> shuffleHeaders = new ArrayList<ShuffleHeader>();
-
-  ShuffleHandler shuffleHandler = new ShuffleHandler(this);
-  
-  byte[] kvbuffer;
-  IntBuffer kvmeta;
-
-  @Override
-  public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
-    super.initialize(outputContext, conf, numOutputs);
-    shuffleHandler.initialize(outputContext, conf);
-  }
-
-  @Override
-  protected void spill(int mstart, int mend) 
-      throws IOException, InterruptedException {
-    // Start the shuffleHandler
-    shuffleHandler.start();
-
-    // Don't spill!
-    
-    // Make a copy
-    this.kvbuffer = super.kvbuffer;
-    this.kvmeta = super.kvmeta;
-
-    // Just save spill-indices for serving later
-    int spindex = mstart;
-    for (int i = 0; i < partitions; ++i) {
-      spillIndices.add(spindex);
-      
-      int length = 0;
-      while (spindex < mend &&
-          kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
-
-        final int kvoff = offsetFor(spindex);
-        int keyLen = 
-            kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART);
-        int valLen = getInMemVBytesLength(kvoff);
-        length += 
-            (keyLen + WritableUtils.getVIntSize(keyLen)) + 
-            (valLen + WritableUtils.getVIntSize(valLen));
-
-        ++spindex;
-      }
-      length += IFILE_EOF_LENGTH;
-      
-      shuffleHeaders.add( 
-          new ShuffleHeader(
-              outputContext.getUniqueIdentifier(), // TODO Verify that this is correct. 
-              length + IFILE_CHECKSUM_LENGTH, length, i)
-          );
-      LOG.info("shuffleHeader[" + i + "]:" +
-      		" rawLen=" + length + " partLen=" + (length + IFILE_CHECKSUM_LENGTH) + 
-          " spillIndex=" + spillIndices.get(i));
-    }
-    
-    LOG.info("Saved " + spillIndices.size() + " spill-indices and " + 
-        shuffleHeaders.size() + " shuffle headers");
-  }
-
-  @Override
-  public InputStream getSortedStream(int partition) {
-    return new SortBufferInputStream(this, partition);
-  }
-
-  @Override
-  public void close() throws IOException {
-    // FIXME
-    //shuffleHandler.stop();
-  }
-
-  @Override
-  public ShuffleHeader getShuffleHeader(int reduce) {
-    return shuffleHeaders.get(reduce);
-  }
-
-  public int getSpillIndex(int partition) {
-    return spillIndices.get(partition);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3ab7310f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/SortBufferInputStream.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/SortBufferInputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/SortBufferInputStream.java
deleted file mode 100644
index b7b1e82..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/SortBufferInputStream.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.runtime.library.common.sort.impl.dflt;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.IntBuffer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryWriter;
-import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.InMemValBytes;
-
-  public class SortBufferInputStream extends InputStream {
-
-  private static final Log LOG = LogFactory.getLog(SortBufferInputStream.class);
-  
-  private final InMemoryShuffleSorter sorter;
-  private InMemoryWriter sortOutput;
-  
-  private int mend;
-  private int recIndex;
-  private final byte[] kvbuffer;       
-  private final IntBuffer kvmeta;
-  private final int partitionBytes;
-  private final int partition;
-  
-  byte[] dualBuf = new byte[8192];
-  DualBufferOutputStream out;
-  private int readBytes = 0;
-  
-  public SortBufferInputStream(
-      InMemoryShuffleSorter sorter, int partition) {
-    this.sorter = sorter;
-    this.partitionBytes = 
-        (int)sorter.getShuffleHeader(partition).getCompressedLength();
-    this.partition = partition;
-    this.mend = sorter.getMetaEnd();
-    this.recIndex = sorter.getSpillIndex(partition);
-    this.kvbuffer = sorter.kvbuffer;
-    this.kvmeta = sorter.kvmeta;
-    out = new DualBufferOutputStream(null, 0, 0, dualBuf);
-    sortOutput = new InMemoryWriter(out);
-  }
-  
-  byte[] one = new byte[1];
-  
-  @Override
-  public int read() throws IOException {
-    int b = read(one, 0, 1);
-    return (b == -1) ? b : one[0]; 
-  }
-
-  @Override
-  public int read(byte[] b) throws IOException {
-    return read(b, 0, b.length);
-  }
-
-  @Override
-  public int read(byte[] b, int off, int len) throws IOException {
-    if (available() == 0) {
-      return -1;
-    }
-    
-    int currentOffset = off;
-    int currentLength = len;
-    int currentReadBytes = 0;
-    
-    // Check if there is residual data in the dualBuf
-    int residualLen = out.getCurrent();
-    if (residualLen > 0) {
-      int readable = Math.min(currentLength, residualLen);
-      System.arraycopy(dualBuf, 0, b, currentOffset, readable);
-      currentOffset += readable;
-      currentReadBytes += readable;
-      out.setCurrentPointer(-readable);
-      
-      // buffer has less capacity
-      currentLength -= readable;
-      
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("XXX read_residual:" +
-            " readable=" + readable +
-            " readBytes=" + readBytes);
-      }
-    }
-    
-    // Now, use the provided buffer
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("XXX read: out.reset" +
-          " b=" + b + 
-          " currentOffset=" + currentOffset + 
-          " currentLength=" + currentLength +
-          " recIndex=" + recIndex);
-    }
-    out.reset(b, currentOffset, currentLength);
-    
-    // Read from sort-buffer into the provided buffer, space permitting
-    DataInputBuffer key = new DataInputBuffer();
-    final InMemValBytes value = sorter.createInMemValBytes();
-    
-    int kvPartition = 0;
-    int numRec = 0;
-    for (;
-         currentLength > 0 && recIndex < mend && 
-             (kvPartition = getKVPartition(recIndex)) == partition;
-        ++recIndex) {
-      
-      final int kvoff = sorter.offsetFor(recIndex);
-      
-      int keyLen = 
-          (kvmeta.get(kvoff + InMemoryShuffleSorter.VALSTART) - 
-              kvmeta.get(kvoff + InMemoryShuffleSorter.KEYSTART));
-      key.reset(
-          kvbuffer, 
-          kvmeta.get(kvoff + InMemoryShuffleSorter.KEYSTART),
-          keyLen
-          );
-      
-      int valLen = sorter.getVBytesForOffset(kvoff, value);
-
-      int recLen = 
-          (keyLen + WritableUtils.getVIntSize(keyLen)) + 
-          (valLen + WritableUtils.getVIntSize(valLen));
-      
-      currentReadBytes += recLen;
-      currentOffset += recLen;
-      currentLength -= recLen;
-
-      // Write out key/value into the in-mem ifile
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("XXX read: sortOutput.append" +
-            " #rec=" + ++numRec +
-            " recIndex=" + recIndex + " kvoff=" + kvoff + 
-            " keyLen=" + keyLen + " valLen=" + valLen + " recLen=" + recLen +
-            " readBytes=" + readBytes +
-            " currentReadBytes="  + currentReadBytes +
-            " currentLength=" + currentLength);
-      }
-      sortOutput.append(key, value);
-    }
-
-    // If we are at the end of the segment, close the ifile
-    if (currentLength > 0 && 
-        (recIndex == mend || kvPartition != partition)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("XXX About to call close:" +
-            " currentLength=" + currentLength + 
-            " recIndex=" + recIndex + " mend=" + mend + 
-            " kvPartition=" + kvPartition + " partitino=" + partition);
-      }
-      sortOutput.close();
-      currentReadBytes += 
-          (InMemoryShuffleSorter.IFILE_EOF_LENGTH + 
-              InMemoryShuffleSorter.IFILE_CHECKSUM_LENGTH);
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("XXX Hmm..." +
-            " currentLength=" + currentLength + 
-            " recIndex=" + recIndex + " mend=" + mend + 
-            " kvPartition=" + kvPartition + " partitino=" + partition);
-      }
-    }
-    
-    int retVal = Math.min(currentReadBytes, len);
-    readBytes += retVal;
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("XXX read: done" +
-          " retVal=" + retVal + 
-          " currentReadBytes=" + currentReadBytes +
-          " len=" + len + 
-          " readBytes=" + readBytes +
-          " partitionBytes=" + partitionBytes +
-          " residualBytes=" + out.getCurrent());
-    }
-    return retVal;
-  }
-
-  private int getKVPartition(int recIndex) {
-    return kvmeta.get(
-        sorter.offsetFor(recIndex) + InMemoryShuffleSorter.PARTITION);
-  }
-  
-  @Override
-  public int available() throws IOException {
-    return (partitionBytes - readBytes);
-  }
-
-  @Override
-  public void close() throws IOException {
-    super.close();
-  }
-
-  @Override
-  public boolean markSupported() {
-    return false;
-  }
-  
-  static class DualBufferOutputStream extends BoundedByteArrayOutputStream {
-
-    byte[] dualBuf;
-    int currentPointer = 0;
-    byte[] one = new byte[1];
-    
-    public DualBufferOutputStream(
-        byte[] buf, int offset, int length, 
-        byte[] altBuf) {
-      super(buf, offset, length);
-      this.dualBuf = altBuf;
-    }
-    
-    public void reset(byte[] b, int off, int len) {
-      super.resetBuffer(b, off, len);
-    }
-
-    @Override
-    public void write(int b) throws IOException {
-      one[0] = (byte)b;
-      write(one, 0, 1);
-    }
-
-    @Override
-    public void write(byte[] b) throws IOException {
-      write(b, 0, b.length);
-    }
-
-    @Override
-    public void write(byte[] b, int off, int len) throws IOException {
-      int available = super.available();
-      if (available >= len) {
-        super.write(b, off, len);
-      } else {
-        super.write(b, off, available);
-        System.arraycopy(b, off+available, dualBuf, currentPointer, len-available);
-        currentPointer += (len - available);
-      }
-    }
-    
-    int getCurrent() {
-      return currentPointer;
-    }
-    
-    void setCurrentPointer(int delta) {
-      if ((currentPointer + delta) > dualBuf.length) {
-        throw new IndexOutOfBoundsException("Trying to set dualBuf 'current'" +
-        		" marker to " + (currentPointer+delta) + " when " +
-        		" dualBuf.length is " + dualBuf.length);
-      }
-      currentPointer = (currentPointer + delta) % dualBuf.length;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3ab7310f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/InMemorySortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/InMemorySortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/InMemorySortedOutput.java
deleted file mode 100644
index 2a2872c..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/InMemorySortedOutput.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.output;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.Output;
-import org.apache.tez.runtime.api.TezOutputContext;
-import org.apache.tez.runtime.api.Writer;
-import org.apache.tez.runtime.library.api.KeyValueWriter;
-import org.apache.tez.runtime.library.common.sort.impl.dflt.InMemoryShuffleSorter;
-
-/**
- * {@link InMemorySortedOutput} is an {@link Output} which sorts key/value pairs 
- * written to it and persists it to a file.
- */
-public class InMemorySortedOutput implements LogicalOutput {
-  
-  protected InMemoryShuffleSorter sorter;
-  protected int numTasks;
-  protected TezOutputContext outputContext;
-  
-
-  @Override
-  public List<Event> initialize(TezOutputContext outputContext)
-      throws IOException {
-    this.outputContext = outputContext;
-    this.sorter = new InMemoryShuffleSorter();
-    sorter.initialize(outputContext, TezUtils.createConfFromUserPayload(outputContext.getUserPayload()), numTasks);
-    return Collections.emptyList();
-  }
-
-  @Override
-  public Writer getWriter() throws IOException {
-    return new KeyValueWriter() {
-      
-      @Override
-      public void write(Object key, Object value) throws IOException {
-        sorter.write(key, value);
-      }
-    };
-  }
-
-  @Override
-  public void handleEvents(List<Event> outputEvents) {
-    // No events expected.
-  }
-
-  @Override
-  public void setNumPhysicalOutputs(int numOutputs) {
-    this.numTasks = numOutputs;
-  }
-  
-  @Override
-  public List<Event> close() throws IOException {
-    sorter.flush();
-    sorter.close();
-    // TODO NEWTEZ Event generation
-    return null;
-  }
-}


Mime
View raw message