tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1457129 [9/38] - in /incubator/tez: ./ tez-ampool/ tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/ tez-ampool/src/main/conf/ tez-ampool/src/main/java/ tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-amp...
Date Fri, 15 Mar 2013 21:26:48 GMT
Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,555 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common.shuffle.server;
+
+import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import javax.crypto.SecretKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.common.security.JobTokenSecretManager;
+import org.apache.tez.engine.common.security.SecureShuffleUtils;
+import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
+import org.apache.tez.engine.common.sort.impl.ExternalSorter;
+import org.apache.tez.records.TezJobID;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.stream.ChunkedStream;
+import org.jboss.netty.handler.stream.ChunkedWriteHandler;
+import org.jboss.netty.util.CharsetUtil;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class ShuffleHandler extends AbstractService 
+    implements AuxServices.AuxiliaryService {
+
+  private static final Log LOG = LogFactory.getLog(ShuffleHandler.class);
+  
+  public static final String SHUFFLE_MANAGE_OS_CACHE = "mapreduce.shuffle.manage.os.cache";
+  public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
+
+  public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes";
+  public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
+
+  private int port;
+  private ChannelFactory selector;
+  private final ChannelGroup accepted = new DefaultChannelGroup();
+  private HttpPipelineFactory pipelineFact;
+  private int sslFileBufferSize;
+
+  public static final String MAPREDUCE_SHUFFLE_SERVICEID =
+      "mapreduce.shuffle";
+
+  private static final Map<String,String> userRsrc =
+    new ConcurrentHashMap<String,String>();
+  private static final JobTokenSecretManager secretManager =
+    new JobTokenSecretManager();
+  private SecretKey tokenSecret;
+
+  public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port";
+  public static final int DEFAULT_SHUFFLE_PORT = 8080;
+
+  public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
+    "mapreduce.shuffle.ssl.file.buffer.size";
+
+  public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
+
+  private ExternalSorter sorter;
+  
+  @Metrics(about="Shuffle output metrics", context="mapred")
+  static class ShuffleMetrics implements ChannelFutureListener {
+    @Metric("Shuffle output in bytes")
+        MutableCounterLong shuffleOutputBytes;
+    @Metric("# of failed shuffle outputs")
+        MutableCounterInt shuffleOutputsFailed;
+    @Metric("# of succeeeded shuffle outputs")
+        MutableCounterInt shuffleOutputsOK;
+    @Metric("# of current shuffle connections")
+        MutableGaugeInt shuffleConnections;
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      if (future.isSuccess()) {
+        shuffleOutputsOK.incr();
+      } else {
+        shuffleOutputsFailed.incr();
+      }
+      shuffleConnections.decr();
+    }
+  }
+
+  final ShuffleMetrics metrics;
+
+  ShuffleHandler(MetricsSystem ms) {
+    super("httpshuffle");
+    metrics = ms.register(new ShuffleMetrics());
+  }
+
+  public ShuffleHandler(ExternalSorter sorter) {
+    this(DefaultMetricsSystem.instance());
+    this.sorter = sorter;
+  }
+
+  /**
+   * Serialize the shuffle port into a ByteBuffer for use later on.
+   * @param port the port to be sent to the ApplciationMaster
+   * @return the serialized form of the port.
+   */
+  public static ByteBuffer serializeMetaData(int port) throws IOException {
+    //TODO these bytes should be versioned
+    DataOutputBuffer port_dob = new DataOutputBuffer();
+    port_dob.writeInt(port);
+    return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
+  }
+
+  /**
+   * A helper function to deserialize the metadata returned by ShuffleHandler.
+   * @param meta the metadata returned by the ShuffleHandler
+   * @return the port the Shuffle Handler is listening on to serve shuffle data.
+   */
+  public static int deserializeMetaData(ByteBuffer meta) throws IOException {
+    //TODO this should be returning a class not just an int
+    DataInputByteBuffer in = new DataInputByteBuffer();
+    in.reset(meta);
+    int port = in.readInt();
+    return port;
+  }
+
+  /**
+   * A helper function to serialize the JobTokenIdentifier to be sent to the
+   * ShuffleHandler as ServiceData.
+   * @param jobToken the job token to be used for authentication of
+   * shuffle data requests.
+   * @return the serialized version of the jobToken.
+   */
+  public static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> jobToken) throws IOException {
+    //TODO these bytes should be versioned
+    DataOutputBuffer jobToken_dob = new DataOutputBuffer();
+    jobToken.write(jobToken_dob);
+    return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength());
+  }
+
+  static Token<JobTokenIdentifier> deserializeServiceData(ByteBuffer secret) throws IOException {
+    DataInputByteBuffer in = new DataInputByteBuffer();
+    in.reset(secret);
+    Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
+    jt.readFields(in);
+    return jt;
+  }
+
+  
+  
+  @Override
+  public void initApp(String user, ApplicationId appId, ByteBuffer secret) {
+    // TODO these bytes should be versioned
+    try {
+      Token<JobTokenIdentifier> jt = deserializeServiceData(secret);
+       // TODO: Once SHuffle is out of NM, this can use MR APIs
+      TezJobID jobId = new TezJobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
+      userRsrc.put(jobId.toString(), user);
+      LOG.info("Added token for " + jobId.toString());
+      secretManager.addTokenForJob(jobId.toString(), jt);
+    } catch (IOException e) {
+      LOG.error("Error during initApp", e);
+      // TODO add API to AuxiliaryServices to report failures
+    }
+  }
+
+  @Override
+  public void stopApp(ApplicationId appId) {
+    TezJobID jobId = new TezJobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
+    secretManager.removeTokenForJob(jobId.toString());
+    userRsrc.remove(jobId.toString());
+  }
+
+  public synchronized void init(Configuration conf, TezTask task) {
+    this.init(conf);
+    tokenSecret = task.getJobTokenSecret();
+  }
+
+  @Override
+  public synchronized void init(Configuration conf) {
+    ThreadFactory bossFactory = new ThreadFactoryBuilder()
+      .setNameFormat("ShuffleHandler Netty Boss #%d")
+      .build();
+    ThreadFactory workerFactory = new ThreadFactoryBuilder()
+      .setNameFormat("ShuffleHandler Netty Worker #%d")
+      .build();
+    
+    selector = new NioServerSocketChannelFactory(
+        Executors.newCachedThreadPool(bossFactory),
+        Executors.newCachedThreadPool(workerFactory));    
+    super.init(new Configuration(conf));
+  }
+
+  // TODO change AbstractService to throw InterruptedException
+  @Override
+  public synchronized void start() {
+    Configuration conf = getConfig();
+    ServerBootstrap bootstrap = new ServerBootstrap(selector);
+    try {
+      pipelineFact = new HttpPipelineFactory(conf);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+    bootstrap.setPipelineFactory(pipelineFact);
+    // Let OS pick the port
+    Channel ch = bootstrap.bind(new InetSocketAddress(0));
+    accepted.add(ch);
+    port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+    conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port));
+    pipelineFact.SHUFFLE.setPort(port);
+    LOG.info(getName() + " listening on port " + port);
+    super.start();
+
+    sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
+                                    DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
+  }
+
+  @Override
+  public synchronized void stop() {
+    accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+    ServerBootstrap bootstrap = new ServerBootstrap(selector);
+    bootstrap.releaseExternalResources();
+    pipelineFact.destroy();
+    super.stop();
+  }
+
+  @Override
+  public synchronized ByteBuffer getMeta() {
+    try {
+      return serializeMetaData(port); 
+    } catch (IOException e) {
+      LOG.error("Error during getMeta", e);
+      // TODO add API to AuxiliaryServices to report failures
+      return null;
+    }
+  }
+
+  class HttpPipelineFactory implements ChannelPipelineFactory {
+
+    final Shuffle SHUFFLE;
+    private SSLFactory sslFactory;
+
+    public HttpPipelineFactory(Configuration conf) throws Exception {
+      SHUFFLE = new Shuffle(conf);
+      if (conf.getBoolean(TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_SSL,
+              TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_ENABLE_SSL)) {
+        sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
+        sslFactory.init();
+      }
+    }
+
+    public void destroy() {
+      if (sslFactory != null) {
+        sslFactory.destroy();
+      }
+    }
+
+    @Override
+    public ChannelPipeline getPipeline() throws Exception {
+      ChannelPipeline pipeline = Channels.pipeline();
+      if (sslFactory != null) {
+        pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
+      }
+      pipeline.addLast("decoder", new HttpRequestDecoder());
+      pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
+      pipeline.addLast("encoder", new HttpResponseEncoder());
+      pipeline.addLast("chunking", new ChunkedWriteHandler());
+      pipeline.addLast("shuffle", SHUFFLE);
+      return pipeline;
+      // TODO factor security manager into pipeline
+      // TODO factor out encode/decode to permit binary shuffle
+      // TODO factor out decode of index to permit alt. models
+    }
+
+  }
+
+  class Shuffle extends SimpleChannelUpstreamHandler {
+
+    private final Configuration conf;
+    private int port;
+
+    public Shuffle(Configuration conf) {
+      this.conf = conf;
+      this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
+    }
+    
+    public void setPort(int port) {
+      this.port = port;
+    }
+
+    private List<String> splitMaps(List<String> mapq) {
+      if (null == mapq) {
+        return null;
+      }
+      final List<String> ret = new ArrayList<String>();
+      for (String s : mapq) {
+        Collections.addAll(ret, s.split(","));
+      }
+      return ret;
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+        throws Exception {
+      HttpRequest request = (HttpRequest) evt.getMessage();
+      if (request.getMethod() != GET) {
+          sendError(ctx, METHOD_NOT_ALLOWED);
+          return;
+      }
+      final Map<String,List<String>> q =
+        new QueryStringDecoder(request.getUri()).getParameters();
+      final List<String> mapIds = splitMaps(q.get("map"));
+      final List<String> reduceQ = q.get("reduce");
+      final List<String> jobQ = q.get("job");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("RECV: " + request.getUri() +
+            "\n  mapId: " + mapIds +
+            "\n  reduceId: " + reduceQ +
+            "\n  jobId: " + jobQ);
+      }
+
+      if (mapIds == null || reduceQ == null || jobQ == null) {
+        sendError(ctx, "Required param job, map and reduce", BAD_REQUEST);
+        return;
+      }
+      if (reduceQ.size() != 1 || jobQ.size() != 1) {
+        sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST);
+        return;
+      }
+      int reduceId;
+      String jobId;
+      try {
+        reduceId = Integer.parseInt(reduceQ.get(0));
+        jobId = jobQ.get(0);
+      } catch (NumberFormatException e) {
+        sendError(ctx, "Bad reduce parameter", BAD_REQUEST);
+        return;
+      } catch (IllegalArgumentException e) {
+        sendError(ctx, "Bad job parameter", BAD_REQUEST);
+        return;
+      }
+
+      final String reqUri = request.getUri();
+      if (null == reqUri) {
+        // TODO? add upstream?
+        sendError(ctx, FORBIDDEN);
+        return;
+      }
+      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+      try {
+        verifyRequest(jobId, ctx, request, response,
+            new URL("http", "", this.port, reqUri));
+      } catch (IOException e) {
+        LOG.warn("Shuffle failure ", e);
+        sendError(ctx, e.getMessage(), UNAUTHORIZED);
+        return;
+      }
+
+      Channel ch = evt.getChannel();
+      ch.write(response);
+      // TODO refactor the following into the pipeline
+      ChannelFuture lastMap = null;
+      for (String mapId : mapIds) {
+        try {
+          // TODO: Error handling - validate mapId via TezTaskAttemptId.forName
+          if (!mapId.equals(sorter.getTaskAttemptId().toString())) {
+            String errorMessage =
+                "Illegal shuffle request mapId: " + mapId
+                    + " while actual mapId is " + sorter.getTaskAttemptId(); 
+            LOG.warn(errorMessage);
+            sendError(ctx, errorMessage, BAD_REQUEST);
+            return;
+          }
+
+          lastMap =
+            sendMapOutput(ctx, ch, userRsrc.get(jobId), jobId, mapId, reduceId);
+          if (null == lastMap) {
+            sendError(ctx, NOT_FOUND);
+            return;
+          }
+        } catch (IOException e) {
+          LOG.error("Shuffle error ", e);
+          sendError(ctx, e.getMessage(), INTERNAL_SERVER_ERROR);
+          return;
+        }
+      }
+      lastMap.addListener(metrics);
+      lastMap.addListener(ChannelFutureListener.CLOSE);
+    }
+
+    private void verifyRequest(String appid, ChannelHandlerContext ctx,
+        HttpRequest request, HttpResponse response, URL requestUri)
+        throws IOException {
+      if (null == tokenSecret) {
+        LOG.info("Request for unknown token " + appid);
+        throw new IOException("could not find jobid");
+      }
+      // string to encrypt
+      String enc_str = SecureShuffleUtils.buildMsgFrom(requestUri);
+      // hash from the fetcher
+      String urlHashStr =
+        request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
+      if (urlHashStr == null) {
+        LOG.info("Missing header hash for " + appid);
+        throw new IOException("fetcher cannot be authenticated");
+      }
+      if (LOG.isDebugEnabled()) {
+        int len = urlHashStr.length();
+        LOG.debug("verifying request. enc_str=" + enc_str + "; hash=..." +
+            urlHashStr.substring(len-len/2, len-1));
+      }
+      // verify - throws exception
+      SecureShuffleUtils.verifyReply(urlHashStr, enc_str, tokenSecret);
+      // verification passed - encode the reply
+      String reply =
+        SecureShuffleUtils.generateHash(urlHashStr.getBytes(), tokenSecret);
+      response.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
+      if (LOG.isDebugEnabled()) {
+        int len = reply.length();
+        LOG.debug("Fetcher request verfied. enc_str=" + enc_str + ";reply=" +
+            reply.substring(len-len/2, len-1));
+      }
+    }
+
+    protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch,
+        String user, String jobId, String mapId, int reduce)
+        throws IOException {
+      final ShuffleHeader header = sorter.getShuffleHeader(reduce);
+      final DataOutputBuffer dob = new DataOutputBuffer();
+      header.write(dob);
+      ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+
+      ChannelFuture writeFuture =
+          ch.write(
+              new ChunkedStream(
+                  sorter.getSortedStream(reduce), sslFileBufferSize
+                  )
+              );
+      metrics.shuffleConnections.incr();
+      metrics.shuffleOutputBytes.incr(header.getCompressedLength()); // optimistic
+      return writeFuture;
+    }
+
+    private void sendError(ChannelHandlerContext ctx,
+        HttpResponseStatus status) {
+      sendError(ctx, "", status);
+    }
+
+    private void sendError(ChannelHandlerContext ctx, String message,
+        HttpResponseStatus status) {
+      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+      response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+      response.setContent(
+        ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
+
+      // Close the connection as soon as the error message is sent.
+      ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+        throws Exception {
+      Channel ch = e.getChannel();
+      Throwable cause = e.getCause();
+      if (cause instanceof TooLongFrameException) {
+        sendError(ctx, BAD_REQUEST);
+        return;
+      }
+
+      LOG.error("Shuffle error: ", cause);
+      if (ch.isConnected()) {
+        LOG.error("Shuffle error " + e);
+        sendError(ctx, INTERNAL_SERVER_ERROR);
+      }
+    }
+
+  }
+
+  public int getPort() {
+    return port;
+  }
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/SortingOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/SortingOutput.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/SortingOutput.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/SortingOutput.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.sort;
+
+import org.apache.tez.api.Output;
+import org.apache.tez.common.TezTask;
+
+/**
+ * {@link SortingOutput} is an {@link Output} which sorts incoming key/value
+ * pairs.
+ */
+public interface SortingOutput extends Output {
+  
+  public void setTask(TezTask task);
+  
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,234 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common.sort.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.util.IndexedSorter;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.QuickSort;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.api.Master;
+import org.apache.tez.api.Partitioner;
+import org.apache.tez.api.Processor;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.combine.CombineInput;
+import org.apache.tez.engine.common.combine.CombineOutput;
+import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
+import org.apache.tez.engine.common.sort.impl.IFile.Writer;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
+import org.apache.tez.records.TezTaskAttemptID;
+import org.apache.tez.records.OutputContext;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public abstract class ExternalSorter {
+
+  private static final Log LOG = LogFactory.getLog(ExternalSorter.class);
+
+  public abstract void close() throws IOException, InterruptedException;
+
+  public abstract void flush() throws IOException, InterruptedException;
+
+  public abstract void write(Object key, Object value) throws IOException,
+      InterruptedException;
+
+  protected Processor combineProcessor;
+  protected Partitioner partitioner;
+  protected TezTask task;
+  protected Configuration job;
+  protected FileSystem rfs;
+  protected TezTaskOutput mapOutputFile;
+  protected int partitions;
+  protected Class keyClass;
+  protected Class valClass;
+  protected RawComparator comparator;
+  protected SerializationFactory serializationFactory;
+  protected Serializer keySerializer;
+  protected Serializer valSerializer;
+  
+  protected IndexedSorter sorter;
+
+  // Compression for map-outputs
+  protected CompressionCodec codec;
+  
+  // Counters
+  protected TezCounter mapOutputByteCounter;
+  protected TezCounter mapOutputRecordCounter;
+  protected TezCounter fileOutputByteCounter;
+  protected TezCounter spilledRecordsCounter;
+  protected Progress sortPhase;
+
+  public void initialize(Configuration conf, Master master)
+      throws IOException, InterruptedException {
+    
+    this.job = conf;
+    LOG.info("TEZ_ENGINE_TASK_ATTEMPT_ID: " + 
+        job.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+
+    partitions = 
+        job.getInt(
+            TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE, 
+            TezJobConfig.DEFAULT_TEZ_ENGINE_TASK_OUTDEGREE);
+    rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
+    
+    // sorter
+    sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
+          QuickSort.class, IndexedSorter.class), job);
+    
+    comparator = ConfigUtils.getOutputKeyComparator(job);
+    
+    // k/v serialization
+    keyClass = ConfigUtils.getMapOutputKeyClass(job);
+    valClass = ConfigUtils.getMapOutputValueClass(job);
+    serializationFactory = new SerializationFactory(job);
+    keySerializer = serializationFactory.getSerializer(keyClass);
+    valSerializer = serializationFactory.getSerializer(valClass);
+    
+    //    counters
+    mapOutputByteCounter = 
+        task.getTaskReporter().getCounter(TaskCounter.MAP_OUTPUT_BYTES);
+    mapOutputRecordCounter =
+      task.getTaskReporter().getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
+    fileOutputByteCounter = 
+        task.getTaskReporter().
+            getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
+    spilledRecordsCounter = 
+        task.getTaskReporter().getCounter(TaskCounter.SPILLED_RECORDS);
+    // compression
+    if (ConfigUtils.getCompressMapOutput(job)) {
+      Class<? extends CompressionCodec> codecClass =
+          ConfigUtils.getMapOutputCompressorClass(job, DefaultCodec.class);
+      codec = ReflectionUtils.newInstance(codecClass, job);
+    } else {
+      codec = null;
+    }
+
+    // Task outputs 
+    mapOutputFile =
+        (TezTaskOutput) ReflectionUtils.newInstance(
+            conf.getClass(
+                Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER, 
+                TezTaskOutputFiles.class), conf);
+//    LOG.info("XXX mapOutputFile: " + mapOutputFile.getClass());
+    
+    // sortPhase
+    sortPhase  = task.getProgress().addPhase("sort", 0.333f);
+  }
+
+  /**
+   * Exception indicating that the allocated sort buffer is insufficient to hold
+   * the current record.
+   */
+  @SuppressWarnings("serial")
+  public static class MapBufferTooSmallException extends IOException {
+    public MapBufferTooSmallException(String s) {
+      super(s);
+    }
+  }
+
+  public void setTask(TezTask task) {
+    this.task = task;
+    this.combineProcessor = task.getCombineProcessor();
+    this.partitioner = task.getPartitioner();
+  }
+
+  public TezTaskAttemptID getTaskAttemptId() {
+    return task.getTaskAttemptId();
+  }
+
+  @Private
+  public TezTaskOutput getMapOutput() {
+    return mapOutputFile;
+  }
+
+  protected void runCombineProcessor(TezRawKeyValueIterator kvIter,
+      Writer writer) throws IOException, InterruptedException {
+
+    CombineInput combineIn = new CombineInput(kvIter);
+    combineIn.initialize(job, task.getTaskReporter());
+
+    CombineOutput combineOut = new CombineOutput(writer);
+    combineOut.initialize(job, task.getTaskReporter());
+
+    combineProcessor.process(combineIn, combineOut);
+
+    combineIn.close();
+    combineOut.close();
+
+  }
+
+  /**
+   * Rename srcPath to dstPath on the same volume. This is the same as
+   * RawLocalFileSystem's rename method, except that it will not fall back to a
+   * copy, and it will create the target directory if it doesn't exist.
+   */
+  protected void sameVolRename(Path srcPath, Path dstPath) throws IOException {
+    RawLocalFileSystem rfs = (RawLocalFileSystem) this.rfs;
+    File src = rfs.pathToFile(srcPath);
+    File dst = rfs.pathToFile(dstPath);
+    if (!dst.getParentFile().exists()) {
+      if (!dst.getParentFile().mkdirs()) {
+        throw new IOException("Unable to rename " + src + " to " + dst
+            + ": couldn't create parent directory");
+      }
+    }
+
+    if (!src.renameTo(dst)) {
+      throw new IOException("Unable to rename " + src + " to " + dst);
+    }
+//    LOG.info("XXX sameVolRename src=" + src + ", dst=" + dst);
+  }
+
+  public ExternalSorter() {
+    super();
+  }
+
+  public InputStream getSortedStream(int partition) {
+    throw new UnsupportedOperationException("getSortedStream isn't supported!");
+  }
+
+  public ShuffleHeader getShuffleHeader(int reduce) {
+    throw new UnsupportedOperationException("getShuffleHeader isn't supported!");
+  }
+
+  public OutputContext getOutputContext() {
+    return null;
+  }
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,554 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.sort.impl;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.BufferUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.tez.common.counters.TezCounter;
+
+/**
+ * <code>IFile</code> is the simple <key-len, value-len, key, value> format
+ * for the intermediate map-outputs in Map-Reduce.
+ *
+ * There is a <code>Writer</code> to write out map-outputs in this format and 
+ * a <code>Reader</code> to read files of this format.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class IFile {
+  private static final Log LOG = LogFactory.getLog(IFile.class);
+  public static final int EOF_MARKER = -1; // End of File Marker
+  public static final int RLE_MARKER = -2; // Repeat same key marker
+  public static final DataInputBuffer REPEAT_KEY = new DataInputBuffer();
+    
+  /**
+   * <code>IFile.Writer</code> to write out intermediate map-outputs. 
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  public static class Writer {
+    FSDataOutputStream out;
+    boolean ownOutputStream = false;
+    long start = 0;
+    FSDataOutputStream rawOut;
+    
+    CompressionOutputStream compressedOut;
+    Compressor compressor;
+    boolean compressOutput = false;
+    
+    long decompressedBytesWritten = 0;
+    long compressedBytesWritten = 0;
+
+    // Count records written to disk
+    private long numRecordsWritten = 0;
+    private final TezCounter writtenRecordsCounter;
+
+    IFileOutputStream checksumOut;
+
+    Class keyClass;
+    Class valueClass;
+    Serializer keySerializer;
+    Serializer valueSerializer;
+    
+    DataOutputBuffer buffer = new DataOutputBuffer();
+    DataOutputBuffer previous = new DataOutputBuffer();
+    
+    // de-dup keys or not
+    private boolean rle = false;
+
+    public Writer(Configuration conf, FileSystem fs, Path file, 
+                  Class keyClass, Class valueClass,
+                  CompressionCodec codec,
+                  TezCounter writesCounter) throws IOException {
+      this(conf, fs.create(file), keyClass, valueClass, codec,
+           writesCounter);
+      ownOutputStream = true;
+    }
+    
+    protected Writer(TezCounter writesCounter) {
+      writtenRecordsCounter = writesCounter;
+    }
+
+    public Writer(Configuration conf, FSDataOutputStream out, 
+        Class keyClass, Class valueClass,
+        CompressionCodec codec, TezCounter writesCounter)
+        throws IOException {
+      this.writtenRecordsCounter = writesCounter;
+      this.checksumOut = new IFileOutputStream(out);
+      this.rawOut = out;
+      this.start = this.rawOut.getPos();
+      if (codec != null) {
+        this.compressor = CodecPool.getCompressor(codec);
+        if (this.compressor != null) {
+          this.compressor.reset();
+          this.compressedOut = codec.createOutputStream(checksumOut, compressor);
+          this.out = new FSDataOutputStream(this.compressedOut,  null);
+          this.compressOutput = true;
+        } else {
+          LOG.warn("Could not obtain compressor from CodecPool");
+          this.out = new FSDataOutputStream(checksumOut,null);
+        }
+      } else {
+        this.out = new FSDataOutputStream(checksumOut,null);
+      }
+      
+      this.keyClass = keyClass;
+      this.valueClass = valueClass;
+
+      if (keyClass != null) {
+        SerializationFactory serializationFactory = 
+          new SerializationFactory(conf);
+        this.keySerializer = serializationFactory.getSerializer(keyClass);
+        this.keySerializer.open(buffer);
+        this.valueSerializer = serializationFactory.getSerializer(valueClass);
+        this.valueSerializer.open(buffer);
+      }
+    }
+
+    public Writer(Configuration conf, FileSystem fs, Path file) 
+    throws IOException {
+      this(conf, fs, file, null, null, null, null);
+    }
+
+    public void close() throws IOException {
+
+      // When IFile writer is created by BackupStore, we do not have
+      // Key and Value classes set. So, check before closing the
+      // serializers
+      if (keyClass != null) {
+        keySerializer.close();
+        valueSerializer.close();
+      }
+
+      // Write EOF_MARKER for key/value length
+      WritableUtils.writeVInt(out, EOF_MARKER);
+      WritableUtils.writeVInt(out, EOF_MARKER);
+      decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
+      
+      //Flush the stream
+      out.flush();
+  
+      if (compressOutput) {
+        // Flush
+        compressedOut.finish();
+        compressedOut.resetState();
+      }
+      
+      // Close the underlying stream iff we own it...
+      if (ownOutputStream) {
+        out.close();
+      }
+      else {
+        // Write the checksum
+        checksumOut.finish();
+      }
+
+      compressedBytesWritten = rawOut.getPos() - start;
+
+      if (compressOutput) {
+        // Return back the compressor
+        CodecPool.returnCompressor(compressor);
+        compressor = null;
+      }
+
+      out = null;
+      if(writtenRecordsCounter != null) {
+        writtenRecordsCounter.increment(numRecordsWritten);
+      }
+    }
+
+    public void append(Object key, Object value) throws IOException {
+      if (key.getClass() != keyClass)
+        throw new IOException("wrong key class: "+ key.getClass()
+                              +" is not "+ keyClass);
+      if (value.getClass() != valueClass)
+        throw new IOException("wrong value class: "+ value.getClass()
+                              +" is not "+ valueClass);
+      
+      boolean sameKey = false;
+
+      // Append the 'key'
+      keySerializer.serialize(key);
+      int keyLength = buffer.getLength();
+      if (keyLength < 0) {
+        throw new IOException("Negative key-length not allowed: " + keyLength + 
+                              " for " + key);
+      }     
+      
+      if(keyLength == previous.getLength()) {
+        sameKey = (BufferUtils.compare(previous, buffer) == 0);       
+      }
+      
+      if(!sameKey) {
+        BufferUtils.copy(buffer, previous);
+      }
+
+      // Append the 'value'
+      valueSerializer.serialize(value);
+      int valueLength = buffer.getLength() - keyLength;
+      if (valueLength < 0) {
+        throw new IOException("Negative value-length not allowed: " + 
+                              valueLength + " for " + value);
+      }
+      
+      if(sameKey) {        
+        WritableUtils.writeVInt(out, RLE_MARKER);                   // Same key as previous
+        WritableUtils.writeVInt(out, valueLength);                  // value length
+        out.write(buffer.getData(), keyLength, buffer.getLength()); // only the value
+        // Update bytes written
+        decompressedBytesWritten += 0 + valueLength + 
+                                    WritableUtils.getVIntSize(RLE_MARKER) + 
+                                    WritableUtils.getVIntSize(valueLength);
+      } else {        
+        // Write the record out        
+        WritableUtils.writeVInt(out, keyLength);                  // key length
+        WritableUtils.writeVInt(out, valueLength);                // value length
+        out.write(buffer.getData(), 0, buffer.getLength());       // data
+        // Update bytes written
+        decompressedBytesWritten += keyLength + valueLength + 
+                                    WritableUtils.getVIntSize(keyLength) + 
+                                    WritableUtils.getVIntSize(valueLength);
+      }
+
+      // Reset
+      buffer.reset();
+      
+      
+      ++numRecordsWritten;
+    }
+    
+    public void append(DataInputBuffer key, DataInputBuffer value)
+    throws IOException {
+      int keyLength = key.getLength() - key.getPosition();
+      if (keyLength < 0) {
+        throw new IOException("Negative key-length not allowed: " + keyLength + 
+                              " for " + key);
+      }
+      
+      int valueLength = value.getLength() - value.getPosition();
+      if (valueLength < 0) {
+        throw new IOException("Negative value-length not allowed: " + 
+                              valueLength + " for " + value);
+      }
+      
+      boolean sameKey = false;
+      
+      if(rle && keyLength == previous.getLength()) {
+        sameKey = (keyLength != 0) && (BufferUtils.compare(previous, key) == 0);        
+      }
+      
+      if(rle && sameKey) {
+        WritableUtils.writeVInt(out, RLE_MARKER);
+        WritableUtils.writeVInt(out, valueLength);        
+        out.write(value.getData(), value.getPosition(), valueLength);
+
+        // Update bytes written
+        decompressedBytesWritten += 0 + valueLength
+            + WritableUtils.getVIntSize(RLE_MARKER)
+            + WritableUtils.getVIntSize(valueLength);
+      } else {
+        WritableUtils.writeVInt(out, keyLength);
+        WritableUtils.writeVInt(out, valueLength);
+        out.write(key.getData(), key.getPosition(), keyLength);
+        out.write(value.getData(), value.getPosition(), valueLength);
+
+        // Update bytes written
+        decompressedBytesWritten += keyLength + valueLength
+            + WritableUtils.getVIntSize(keyLength)
+            + WritableUtils.getVIntSize(valueLength);
+                
+        BufferUtils.copy(key, previous);        
+      }
+      ++numRecordsWritten;
+    }
+    
+    // Required for mark/reset
+    public DataOutputStream getOutputStream () {
+      return out;
+    }
+    
+    // Required for mark/reset
+    public void updateCountersForExternalAppend(long length) {
+      ++numRecordsWritten;
+      decompressedBytesWritten += length;
+    }
+    
+    public long getRawLength() {
+      return decompressedBytesWritten;
+    }
+    
+    public long getCompressedLength() {
+      return compressedBytesWritten;
+    }
+    
+    public void setRLE(boolean rle) {
+      this.rle = rle;
+      previous.reset();
+    }
+
+  }
+
+  /**
+   * <code>IFile.Reader</code> to read intermediate map-outputs. 
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public static class Reader {
+    
+    public enum KeyState {NO_KEY, NEW_KEY, SAME_KEY};
+    
+    private static final int DEFAULT_BUFFER_SIZE = 128*1024;
+
+    // Count records read from disk
+    private long numRecordsRead = 0;
+    private final TezCounter readRecordsCounter;
+
+    final InputStream in;        // Possibly decompressed stream that we read
+    Decompressor decompressor;
+    public long bytesRead = 0;
+    protected final long fileLength;
+    protected boolean eof = false;
+    final IFileInputStream checksumIn;
+    
+    protected byte[] buffer = null;
+    protected int bufferSize = DEFAULT_BUFFER_SIZE;
+    protected DataInputStream dataIn;
+
+    protected int recNo = 1;
+    protected int prevKeyLength;
+    protected int currentKeyLength;
+    protected int currentValueLength;
+    byte keyBytes[] = new byte[0];
+    
+    
+    /**
+     * Construct an IFile Reader.
+     * 
+     * @param conf Configuration File 
+     * @param fs  FileSystem
+     * @param file Path of the file to be opened. This file should have
+     *             checksum bytes for the data at the end of the file.
+     * @param codec codec
+     * @param readsCounter Counter for records read from disk
+     * @throws IOException
+     */
+    public Reader(Configuration conf, FileSystem fs, Path file,
+                  CompressionCodec codec,
+                  TezCounter readsCounter) throws IOException {
+      this(conf, fs.open(file), 
+           fs.getFileStatus(file).getLen(),
+           codec, readsCounter);
+    }
+
+    /**
+     * Construct an IFile Reader.
+     * 
+     * @param conf Configuration File 
+     * @param in   The input stream
+     * @param length Length of the data in the stream, including the checksum
+     *               bytes.
+     * @param codec codec
+     * @param readsCounter Counter for records read from disk
+     * @throws IOException
+     */
+    public Reader(Configuration conf, FSDataInputStream in, long length, 
+                  CompressionCodec codec,
+                  TezCounter readsCounter) throws IOException {
+      readRecordsCounter = readsCounter;
+      checksumIn = new IFileInputStream(in,length, conf);
+      if (codec != null) {
+        decompressor = CodecPool.getDecompressor(codec);
+        if (decompressor != null) {
+          this.in = codec.createInputStream(checksumIn, decompressor);
+        } else {
+          LOG.warn("Could not obtain decompressor from CodecPool");
+          this.in = checksumIn;
+        }
+      } else {
+        this.in = checksumIn;
+      }
+      this.dataIn = new DataInputStream(this.in);
+      this.fileLength = length;
+      
+      if (conf != null) {
+        bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
+      }
+    }
+    
+    public long getLength() { 
+      return fileLength - checksumIn.getSize();
+    }
+    
+    public long getPosition() throws IOException {    
+      return checksumIn.getPosition(); 
+    }
+    
+    /**
+     * Read upto len bytes into buf starting at offset off.
+     * 
+     * @param buf buffer 
+     * @param off offset
+     * @param len length of buffer
+     * @return the no. of bytes read
+     * @throws IOException
+     */
+    private int readData(byte[] buf, int off, int len) throws IOException {
+      int bytesRead = 0;
+      while (bytesRead < len) {
+        int n = IOUtils.wrappedReadForCompressedData(in, buf, off + bytesRead,
+            len - bytesRead);
+        if (n < 0) {
+          return bytesRead;
+        }
+        bytesRead += n;
+      }
+      return len;
+    }
+    
+    protected boolean positionToNextRecord(DataInput dIn) throws IOException {
+      // Sanity check
+      if (eof) {
+        throw new EOFException("Completed reading " + bytesRead);
+      }
+      
+      // Read key and value lengths
+      prevKeyLength = currentKeyLength;
+      currentKeyLength = WritableUtils.readVInt(dIn);
+      currentValueLength = WritableUtils.readVInt(dIn);
+      bytesRead += WritableUtils.getVIntSize(currentKeyLength) +
+                   WritableUtils.getVIntSize(currentValueLength);
+      
+      // Check for EOF
+      if (currentKeyLength == EOF_MARKER && currentValueLength == EOF_MARKER) {
+        eof = true;
+        return false;
+      }      
+      
+      // Sanity check
+      if (currentKeyLength != RLE_MARKER && currentKeyLength < 0) {
+        throw new IOException("Rec# " + recNo + ": Negative key-length: " + 
+                              currentKeyLength);
+      }
+      if (currentValueLength < 0) {
+        throw new IOException("Rec# " + recNo + ": Negative value-length: " + 
+                              currentValueLength);
+      }
+            
+      return true;
+    }
+    
+    public boolean nextRawKey(DataInputBuffer key) throws IOException {
+      return readRawKey(key) != KeyState.NO_KEY;
+    }
+    
+    public KeyState readRawKey(DataInputBuffer key) throws IOException {
+      if (!positionToNextRecord(dataIn)) {
+        return KeyState.NO_KEY;
+      }
+      if(currentKeyLength == RLE_MARKER) {
+        currentKeyLength = prevKeyLength;
+        // no data to read
+        key.reset(keyBytes, currentKeyLength);
+        return KeyState.SAME_KEY;
+      }
+      if (keyBytes.length < currentKeyLength) {
+        keyBytes = new byte[currentKeyLength << 1];
+      }
+      int i = readData(keyBytes, 0, currentKeyLength);
+      if (i != currentKeyLength) {
+        throw new IOException ("Asked for " + currentKeyLength + " Got: " + i);
+      }
+      key.reset(keyBytes, currentKeyLength);
+      bytesRead += currentKeyLength;
+      return KeyState.NEW_KEY;
+    }
+    
+    public void nextRawValue(DataInputBuffer value) throws IOException {
+      final byte[] valBytes = 
+        ((value.getData().length < currentValueLength) || (value.getData() == keyBytes))
+        ? new byte[currentValueLength << 1]
+        : value.getData();
+      int i = readData(valBytes, 0, currentValueLength);
+      if (i != currentValueLength) {
+        throw new IOException ("Asked for " + currentValueLength + " Got: " + i);
+      }
+      value.reset(valBytes, currentValueLength);
+      
+      // Record the bytes read
+      bytesRead += currentValueLength;
+
+      ++recNo;
+      ++numRecordsRead;
+    }
+    
+    public void close() throws IOException {
+      // Close the underlying stream
+      in.close();
+      
+      // Release the buffer
+      dataIn = null;
+      buffer = null;
+      if(readRecordsCounter != null) {
+        readRecordsCounter.increment(numRecordsRead);
+      }
+
+      // Return the decompressor
+      if (decompressor != null) {
+        decompressor.reset();
+        CodecPool.returnDecompressor(decompressor);
+        decompressor = null;
+      }
+    }
+    
+    public void reset(int offset) {
+      return;
+    }
+
+    public void disableChecksumValidation() {
+      checksumIn.disableChecksumValidation();
+    }
+
+  }    
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileInputStream.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileInputStream.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileInputStream.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileInputStream.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.sort.impl;
+
+import java.io.EOFException;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.HasFileDescriptor;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.tez.common.TezJobConfig;
+/**
+ * A checksum input stream, used for IFiles.
+ * Used to validate the checksum of files created by {@link IFileOutputStream}. 
+*/
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class IFileInputStream extends InputStream {
+  
+  private final InputStream in; //The input stream to be verified for checksum.
+  private final FileDescriptor inFd; // the file descriptor, if it is known
+  private final long length; //The total length of the input file
+  private final long dataLength;
+  private DataChecksum sum;
+  private long currentOffset = 0;
+  private final byte b[] = new byte[1];
+  private byte csum[] = null;
+  private int checksumSize;
+  private byte[] buffer;
+  private int offset;
+
+  private ReadaheadRequest curReadahead = null;
+  private ReadaheadPool raPool = ReadaheadPool.getInstance();
+  private boolean readahead;
+  private int readaheadLength;
+
+  public static final Log LOG = LogFactory.getLog(IFileInputStream.class);
+
+  private boolean disableChecksumValidation = false;
+  
+  /**
+   * Create a checksum input stream that reads
+   * @param in The input stream to be verified for checksum.
+   * @param len The length of the input stream including checksum bytes.
+   */
+  public IFileInputStream(InputStream in, long len, Configuration conf) {
+    this.in = in;
+    this.inFd = getFileDescriptorIfAvail(in);
+    sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 
+        Integer.MAX_VALUE);
+    checksumSize = sum.getChecksumSize();
+    buffer = new byte[4096];
+    offset = 0;
+    length = len;
+    dataLength = length - checksumSize;
+
+    conf = (conf != null) ? conf : new Configuration();
+    readahead = conf.getBoolean(TezJobConfig.TEZ_ENGINE_IFILE_READAHEAD,
+        TezJobConfig.DEFAULT_TEZ_ENGINE_IFILE_READAHEAD);
+    readaheadLength = conf.getInt(TezJobConfig.TEZ_ENGINE_IFILE_READAHEAD_BYTES,
+        TezJobConfig.DEFAULT_TEZ_ENGINE_IFILE_READAHEAD_BYTES);
+
+    doReadahead();
+  }
+
+  private static FileDescriptor getFileDescriptorIfAvail(InputStream in) {
+    FileDescriptor fd = null;
+    try {
+      if (in instanceof HasFileDescriptor) {
+        fd = ((HasFileDescriptor)in).getFileDescriptor();
+      } else if (in instanceof FileInputStream) {
+        fd = ((FileInputStream)in).getFD();
+      }
+    } catch (IOException e) {
+      LOG.info("Unable to determine FileDescriptor", e);
+    }
+    return fd;
+  }
+
+  /**
+   * Close the input stream. Note that we need to read to the end of the
+   * stream to validate the checksum.
+   */
+  @Override
+  public void close() throws IOException {
+
+    if (curReadahead != null) {
+      curReadahead.cancel();
+    }
+    if (currentOffset < dataLength) {
+      byte[] t = new byte[Math.min((int)
+            (Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)];
+      while (currentOffset < dataLength) {
+        int n = read(t, 0, t.length);
+        if (0 == n) {
+          throw new EOFException("Could not validate checksum");
+        }
+      }
+    }
+    in.close();
+  }
+  
+  @Override
+  public long skip(long n) throws IOException {
+   throw new IOException("Skip not supported for IFileInputStream");
+  }
+  
+  public long getPosition() {
+    return (currentOffset >= dataLength) ? dataLength : currentOffset;
+  }
+  
+  public long getSize() {
+    return checksumSize;
+  }
+
+  private void checksum(byte[] b, int off, int len) {
+    if(len >= buffer.length) {
+      sum.update(buffer, 0, offset);
+      offset = 0;
+      sum.update(b, off, len);
+      return;
+    }
+    final int remaining = buffer.length - offset;
+    if(len > remaining) {
+      sum.update(buffer, 0, offset);
+      offset = 0;
+    }
+    /* now we should have len < buffer.length */
+    System.arraycopy(b, off, buffer, offset, len);
+    offset += len;
+  }
+  
+  /**
+   * Read bytes from the stream.
+   * At EOF, checksum is validated, but the checksum
+   * bytes are not passed back in the buffer. 
+   */
+  public int read(byte[] b, int off, int len) throws IOException {
+
+    if (currentOffset >= dataLength) {
+      return -1;
+    }
+
+    doReadahead();
+
+    return doRead(b,off,len);
+  }
+
+  private void doReadahead() {
+    if (raPool != null && inFd != null && readahead) {
+      curReadahead = raPool.readaheadStream(
+          "ifile", inFd,
+          currentOffset, readaheadLength, dataLength,
+          curReadahead);
+    }
+  }
+
+  /**
+   * Read bytes from the stream.
+   * At EOF, checksum is validated and sent back
+   * as the last four bytes of the buffer. The caller should handle
+   * these bytes appropriately
+   */
+  public int readWithChecksum(byte[] b, int off, int len) throws IOException {
+
+    if (currentOffset == length) {
+      return -1;
+    }
+    else if (currentOffset >= dataLength) {
+      // If the previous read drained off all the data, then just return
+      // the checksum now. Note that checksum validation would have 
+      // happened in the earlier read
+      int lenToCopy = (int) (checksumSize - (currentOffset - dataLength));
+      if (len < lenToCopy) {
+        lenToCopy = len;
+      }
+      System.arraycopy(csum, (int) (currentOffset - dataLength), b, off, 
+          lenToCopy);
+      currentOffset += lenToCopy;
+      return lenToCopy;
+    }
+
+    int bytesRead = doRead(b,off,len);
+
+    if (currentOffset == dataLength) {
+      if (len >= bytesRead + checksumSize) {
+        System.arraycopy(csum, 0, b, off + bytesRead, checksumSize);
+        bytesRead += checksumSize;
+        currentOffset += checksumSize;
+      }
+    }
+    return bytesRead;
+  }
+
+  private int doRead(byte[]b, int off, int len) throws IOException {
+    
+    // If we are trying to read past the end of data, just read
+    // the left over data
+    if (currentOffset + len > dataLength) {
+      len = (int) dataLength - (int)currentOffset;
+    }
+    
+    int bytesRead = in.read(b, off, len);
+
+    if (bytesRead < 0) {
+      throw new ChecksumException("Checksum Error", 0);
+    }
+
+    checksum(b, off, bytesRead);
+
+    currentOffset += bytesRead;
+
+    if (disableChecksumValidation) {
+      return bytesRead;
+    }
+    
+    if (currentOffset == dataLength) {
+      // The last four bytes are checksum. Strip them and verify
+      sum.update(buffer, 0, offset);
+      csum = new byte[checksumSize];
+      IOUtils.readFully(in, csum, 0, checksumSize);
+      if (!sum.compare(csum, 0)) {
+        throw new ChecksumException("Checksum Error", 0);
+      }
+    }
+    return bytesRead;
+  }
+
+
+  @Override
+  public int read() throws IOException {    
+    b[0] = 0;
+    int l = read(b,0,1);
+    if (l < 0)  return l;
+    
+    // Upgrade the b[0] to an int so as not to misinterpret the
+    // first bit of the byte as a sign bit
+    int result = 0xFF & b[0];
+    return result;
+  }
+
+  public byte[] getChecksum() {
+    return csum;
+  }
+
+  void disableChecksumValidation() {
+    disableChecksumValidation = true;
+  }
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.sort.impl;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.DataChecksum;
+/**
+ * A Checksum output stream.
+ * Checksum for the contents of the file is calculated and
+ * appended to the end of the file on close of the stream.
+ * Used for IFiles
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class IFileOutputStream extends FilterOutputStream {
+  
+  private static final Log LOG = LogFactory.getLog(IFileOutputStream.class);
+  
+  /**
+   * The output stream to be checksummed. 
+   */
+  private final DataChecksum sum;
+  private byte[] barray;
+  private byte[] buffer;
+  private int offset;
+  private boolean closed = false;
+  private boolean finished = false;
+
+  /**
+   * Create a checksum output stream that writes
+   * the bytes to the given stream.
+   * @param out
+   */
+  public IFileOutputStream(OutputStream out) {
+    super(out);
+    sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
+        Integer.MAX_VALUE);
+    barray = new byte[sum.getChecksumSize()];
+    buffer = new byte[4096];
+    offset = 0;
+  }
+  
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    finish();
+    out.close();
+  }
+
+  /**
+   * Finishes writing data to the output stream, by writing
+   * the checksum bytes to the end. The underlying stream is not closed.
+   * @throws IOException
+   */
+  public void finish() throws IOException {
+    if (finished) {
+      return;
+    }
+    finished = true;
+    sum.update(buffer, 0, offset);
+    sum.writeValue(barray, 0, false);
+    out.write (barray, 0, sum.getChecksumSize());
+    out.flush();
+  }
+
+  private void checksum(byte[] b, int off, int len) {
+    if(len >= buffer.length) {
+      sum.update(buffer, 0, offset);
+      offset = 0;
+      sum.update(b, off, len);
+      return;
+    }
+    final int remaining = buffer.length - offset;
+    if(len > remaining) {
+      sum.update(buffer, 0, offset);
+      offset = 0;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("XXX checksum" +
+          " b=" + b + " off=" + off + 
+          " buffer=" + " offset=" + offset + 
+          " len=" + len);
+    }
+    /* now we should have len < buffer.length */
+    System.arraycopy(b, off, buffer, offset, len);
+    offset += len;
+  }
+
+  /**
+   * Write bytes to the stream.
+   */
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    checksum(b, off, len);
+    out.write(b,off,len);
+  }
+ 
+  @Override
+  public void write(int b) throws IOException {
+    barray[0] = (byte) (b & 0xFF);
+    write(barray,0,1);
+  }
+
+}



Mime
View raw message