tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-2714. Fix comments from review - part 3. (sseth)
Date Fri, 14 Aug 2015 21:20:34 GMT
Repository: tez
Updated Branches:
  refs/heads/TEZ-2003 ded95e59e -> da0f93872


TEZ-2714. Fix comments from review - part 3. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/da0f9387
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/da0f9387
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/da0f9387

Branch: refs/heads/TEZ-2003
Commit: da0f93872f0c6f233a48f16c978ba76b44523001
Parents: ded95e5
Author: Siddharth Seth <sseth@apache.org>
Authored: Fri Aug 14 14:20:13 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Fri Aug 14 14:20:13 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |   1 +
 .../api/ServicePluginsDescriptor.java           |  20 +++
 .../dag/app/rm/TaskSchedulerEventHandler.java   |   5 +-
 .../tez/dag/app/rm/container/AMContainer.java   |   1 -
 .../apache/tez/dag/app/MockDAGAppMaster.java    |  20 ++-
 .../app/rm/TestTaskSchedulerEventHandler.java   |   2 +-
 .../dag/app/rm/TestTaskSchedulerHelpers.java    |   5 +-
 .../tez/shufflehandler/FadvisedChunkedFile.java |  78 ---------
 .../tez/shufflehandler/FadvisedFileRegion.java  | 160 -------------------
 .../apache/tez/shufflehandler/IndexCache.java   |  12 +-
 .../tez/shufflehandler/ShuffleHandler.java      |  80 +---------
 11 files changed, 45 insertions(+), 339 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/da0f9387/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 8a8e257..fed203a 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -49,5 +49,6 @@ ALL CHANGES:
   TEZ-2707. Fix comments from reviews - part 2.
   TEZ-2713. Add tests for node handling when there's multiple schedulers.
   TEZ-2721. rebase 08/14
+  TEZ-2714. Fix comments from review - part 3.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/da0f9387/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
index 113b7db..2dabed0 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
@@ -19,6 +19,7 @@ import java.util.Arrays;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tez.dag.api.TezConfiguration;
 
 /**
  * An {@link ServicePluginsDescriptor} describes the list of plugins running within the AM
for
@@ -71,6 +72,13 @@ public class ServicePluginsDescriptor {
   /**
    * Create a service plugin descriptor with the provided plugins. Also allows specification
of whether
    * in-AM execution is enabled. Container execution is enabled by default.
+   *
+   * Note on Uber mode: This is NOT fully supported at the moment. Tasks will be launched
within the
+   * AM process itself, controlled by {@link TezConfiguration#TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS}.
+   * The AM will need to be sized correctly for the tasks. Memory allocation to the running
task
+   * cannot be controlled yet, and is the full AM heap for each task.
+   * TODO: TEZ-2722
+   *
    * @param enableUber whether to enable execution in the AM or not
    * @param taskSchedulerDescriptor the task scheduler plugin descriptors
    * @param containerLauncherDescriptors the container launcher plugin descriptors
@@ -89,6 +97,12 @@ public class ServicePluginsDescriptor {
    * Create a service plugin descriptor with the provided plugins. Also allows specification
of whether
    * container execution and in-AM execution will be enabled.
    *
+   * Note on Uber mode: This is NOT fully supported at the moment. Tasks will be launched
within the
+   * AM process itself, controlled by {@link TezConfiguration#TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS}.
+   * The AM will need to be sized correctly for the tasks. Memory allocation to the running
task
+   * cannot be controlled yet, and is the full AM heap for each task.
+   * TODO: TEZ-2722
+   *
    * @param enableContainers whether to enable execution in containers
    * @param enableUber whether to enable execution in the AM or not
    * @param taskSchedulerDescriptor the task scheduler plugin descriptors
@@ -108,6 +122,12 @@ public class ServicePluginsDescriptor {
    * Create a service plugin descriptor which may have in-AM execution of tasks enabled.
Container
    * execution is enabled by default
    *
+   * Note on Uber mode: This is NOT fully supported at the moment. Tasks will be launched
within the
+   * AM process itself, controlled by {@link TezConfiguration#TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS}.
+   * The AM will need to be sized correctly for the tasks. Memory allocation to the running
task
+   * cannot be controlled yet, and is the full AM heap for each task.
+   * TODO: TEZ-2722
+   *
    * @param enableUber whether to enable execution in the AM or not
    * @return a {@link ServicePluginsDescriptor} instance
    */

http://git-wip-us.apache.org/repos/asf/tez/blob/da0f9387/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 7d2e768..f189b84 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -443,7 +443,8 @@ public class TaskSchedulerEventHandler extends AbstractService implements
   }
 
   @VisibleForTesting
-  protected void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext
appContext) {
+  protected void instantiateSchedulers(String host, int port, String trackingUrl,
+                                       AppContext appContext) {
     // Iterate over the list and create all the taskSchedulers
     int j = 0;
     for (int i = 0; i < taskSchedulerDescriptors.length; i++) {
@@ -472,7 +473,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements
     // always try to connect to AM and proxy the response. hence it wont work if the webUIService
     // is not enabled.
     String trackingUrl = (webUI != null) ? webUI.getTrackingURL() : "";
-    instantiateScheduelrs(serviceAddr.getHostName(), serviceAddr.getPort(), trackingUrl,
appContext);
+    instantiateSchedulers(serviceAddr.getHostName(), serviceAddr.getPort(), trackingUrl,
appContext);
 
     for (int i = 0 ; i < taskSchedulers.length ; i++) {
       taskSchedulerServiceWrappers[i].init(getConfig());

http://git-wip-us.apache.org/repos/asf/tez/blob/da0f9387/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
index 4b2d528..8f5034e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
@@ -22,7 +22,6 @@ import java.util.List;
 
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/da0f9387/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index fe3e4ef..b09eb86 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -38,6 +38,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -56,8 +58,6 @@ import org.apache.tez.client.TezApiVersionInfo;
 import org.apache.tez.common.ContainerContext;
 import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TaskHeartbeatRequest;
-import org.apache.tez.dag.api.TaskHeartbeatResponse;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.launcher.ContainerLauncherRouter;
@@ -325,11 +325,11 @@ public class MockDAGAppMaster extends DAGAppMaster {
       }
     }
     
-    private void doHeartbeat(TaskHeartbeatRequest request, ContainerData cData) throws Exception
{
+    private void doHeartbeat(TezHeartbeatRequest request, ContainerData cData) throws Exception
{
       long startTime = System.nanoTime();
       long startCpuTime = threadMxBean.getCurrentThreadCpuTime();
-      TaskHeartbeatResponse response = taListener.heartbeat(request);
-      if (response.isShouldDie()) {
+      TezHeartbeatResponse response = taskCommunicator.getUmbilical().heartbeat(request);
+      if (response.shouldDie()) {
         cData.remove();
       } else {
         cData.nextFromEventId = response.getNextFromEventId();
@@ -417,9 +417,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
               events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats),
new EventMetaData(
                   EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId),
                   MockDAGAppMaster.this.getContext().getClock().getTime()));
-              TaskHeartbeatRequest request =
-                  new TaskHeartbeatRequest(cData.cIdStr, cData.taId, events, cData.nextFromEventId,
cData.nextPreRoutedFromEventId,
-                      50000);
+              TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events,
+                  cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId,
50000);
               doHeartbeat(request, cData);
             } else if (version != null && cData.taId.getId() <= version.intValue())
{
               preemptContainer(cData);
@@ -430,9 +429,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
                   new TaskAttemptCompletedEvent(), new EventMetaData(
                       EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId),
                   MockDAGAppMaster.this.getContext().getClock().getTime()));
-              TaskHeartbeatRequest request =
-                  new TaskHeartbeatRequest(cData.cIdStr, cData.taId, events, cData.nextFromEventId,
cData.nextPreRoutedFromEventId,
-                      10000);
+              TezHeartbeatRequest request = new TezHeartbeatRequest(++cData.numUpdates, events,
+                  cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId,
10000);
               doHeartbeat(request, cData);
               cData.clear();
             }

http://git-wip-us.apache.org/repos/asf/tez/blob/da0f9387/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index 1550085..c85be6c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -117,7 +117,7 @@ public class TestTaskSchedulerEventHandler {
     }
 
     @Override
-    protected void instantiateScheduelrs(String host, int port, String trackingUrl,
+    protected void instantiateSchedulers(String host, int port, String trackingUrl,
                                          AppContext appContext) {
       taskSchedulers[0] = mockTaskScheduler;
       taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService<>(taskSchedulers[0]);

http://git-wip-us.apache.org/repos/asf/tez/blob/da0f9387/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 0746507..c13ca5a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -147,7 +147,8 @@ class TestTaskSchedulerHelpers {
     }
 
     @Override
-    public void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext
appContext) {
+    public void instantiateSchedulers(String host, int port, String trackingUrl,
+                                      AppContext appContext) {
       TaskSchedulerContext taskSchedulerContext =
           new TaskSchedulerContextImpl(this, appContext, 0, trackingUrl, 1000, host, port,
               defaultPayload);
@@ -166,7 +167,7 @@ class TestTaskSchedulerHelpers {
 
     @Override
     public void serviceStart() {
-      instantiateScheduelrs("host", 0, "", appContext);
+      instantiateSchedulers("host", 0, "", appContext);
       // Init the service so that reuse configuration is picked up.
       ((AbstractService)taskSchedulerServiceWrappers[0]).init(getConfig());
       ((AbstractService)taskSchedulerServiceWrappers[0]).start();

http://git-wip-us.apache.org/repos/asf/tez/blob/da0f9387/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedChunkedFile.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedChunkedFile.java
b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedChunkedFile.java
deleted file mode 100644
index 294add6..0000000
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedChunkedFile.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.shufflehandler;
-
-import java.io.FileDescriptor;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-
-import org.apache.hadoop.io.ReadaheadPool;
-import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
-import org.apache.hadoop.io.nativeio.NativeIO;
-import org.jboss.netty.handler.stream.ChunkedFile;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FadvisedChunkedFile extends ChunkedFile {
-
-  private static final Logger LOG = LoggerFactory.getLogger(FadvisedChunkedFile.class);
-
-  private final boolean manageOsCache;
-  private final int readaheadLength;
-  private final ReadaheadPool readaheadPool;
-  private final FileDescriptor fd;
-  private final String identifier;
-
-  private ReadaheadRequest readaheadRequest;
-
-  public FadvisedChunkedFile(RandomAccessFile file, long position, long count,
-      int chunkSize, boolean manageOsCache, int readaheadLength,
-      ReadaheadPool readaheadPool, String identifier) throws IOException {
-    super(file, position, count, chunkSize);
-    this.manageOsCache = manageOsCache;
-    this.readaheadLength = readaheadLength;
-    this.readaheadPool = readaheadPool;
-    this.fd = file.getFD();
-    this.identifier = identifier;
-  }
-
-  @Override
-  public Object nextChunk() throws Exception {
-    if (manageOsCache && readaheadPool != null) {
-      readaheadRequest = readaheadPool
-          .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
-              getEndOffset(), readaheadRequest);
-    }
-    return super.nextChunk();
-  }
-
-  @Override
-  public void close() throws Exception {
-    if (readaheadRequest != null) {
-      readaheadRequest.cancel();
-    }
-    if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
-      try {
-        NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
-            fd,
-            getStartOffset(), getEndOffset() - getStartOffset(),
-            NativeIO.POSIX.POSIX_FADV_DONTNEED);
-      } catch (Throwable t) {
-        LOG.warn("Failed to manage OS cache for " + identifier, t);
-      }
-    }
-    super.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/da0f9387/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedFileRegion.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedFileRegion.java
b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedFileRegion.java
deleted file mode 100644
index e5392d3..0000000
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedFileRegion.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.shufflehandler;
-
-import java.io.FileDescriptor;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.WritableByteChannel;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.io.ReadaheadPool;
-import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
-import org.apache.hadoop.io.nativeio.NativeIO;
-import org.jboss.netty.channel.DefaultFileRegion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FadvisedFileRegion extends DefaultFileRegion {
-
-  private static final Logger LOG = LoggerFactory.getLogger(FadvisedFileRegion.class);
-
-  private final boolean manageOsCache;
-  private final int readaheadLength;
-  private final ReadaheadPool readaheadPool;
-  private final FileDescriptor fd;
-  private final String identifier;
-  private final long count;
-  private final long position;
-  private final int shuffleBufferSize;
-  private final boolean shuffleTransferToAllowed;
-  private final FileChannel fileChannel;
-  
-  private ReadaheadRequest readaheadRequest;
-
-  public FadvisedFileRegion(RandomAccessFile file, long position, long count,
-      boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
-      String identifier, int shuffleBufferSize, 
-      boolean shuffleTransferToAllowed) throws IOException {
-    super(file.getChannel(), position, count);
-    this.manageOsCache = manageOsCache;
-    this.readaheadLength = readaheadLength;
-    this.readaheadPool = readaheadPool;
-    this.fd = file.getFD();
-    this.identifier = identifier;
-    this.fileChannel = file.getChannel();
-    this.count = count;
-    this.position = position;
-    this.shuffleBufferSize = shuffleBufferSize;
-    this.shuffleTransferToAllowed = shuffleTransferToAllowed;
-  }
-
-  @Override
-  public long transferTo(WritableByteChannel target, long position)
-      throws IOException {
-    if (manageOsCache && readaheadPool != null) {
-      readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
-          getPosition() + position, readaheadLength,
-          getPosition() + getCount(), readaheadRequest);
-    }
-    
-    if(this.shuffleTransferToAllowed) {
-      return super.transferTo(target, position);
-    } else {
-      return customShuffleTransfer(target, position);
-    } 
-  }
-
-  /**
-   * This method transfers data using local buffer. It transfers data from 
-   * a disk to a local buffer in memory, and then it transfers data from the 
-   * buffer to the target. This is used only if transferTo is disallowed in
-   * the configuration file. super.TransferTo does not perform well on Windows 
-   * due to a small IO request generated. customShuffleTransfer can control 
-   * the size of the IO requests by changing the size of the intermediate 
-   * buffer.
-   */
-  @VisibleForTesting
-  long customShuffleTransfer(WritableByteChannel target, long position)
-      throws IOException {
-    long actualCount = this.count - position;
-    if (actualCount < 0 || position < 0) {
-      throw new IllegalArgumentException(
-          "position out of range: " + position +
-          " (expected: 0 - " + (this.count - 1) + ')');
-    }
-    if (actualCount == 0) {
-      return 0L;
-    }
-    
-    long trans = actualCount;
-    int readSize;
-    ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize);
-    
-    while(trans > 0L &&
-        (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
-      //adjust counters and buffer limit
-      if(readSize < trans) {
-        trans -= readSize;
-        position += readSize;
-        byteBuffer.flip();
-      } else {
-        //We can read more than we need if the actualCount is not multiple 
-        //of the byteBuffer size and file is big enough. In that case we cannot
-        //use flip method but we need to set buffer limit manually to trans.
-        byteBuffer.limit((int)trans);
-        byteBuffer.position(0);
-        position += trans; 
-        trans = 0;
-      }
-      
-      //write data to the target
-      while(byteBuffer.hasRemaining()) {
-        target.write(byteBuffer);
-      }
-      
-      byteBuffer.clear();
-    }
-    
-    return actualCount - trans;
-  }
-
-  
-  @Override
-  public void releaseExternalResources() {
-    if (readaheadRequest != null) {
-      readaheadRequest.cancel();
-    }
-    super.releaseExternalResources();
-  }
-  
-  /**
-   * Call when the transfer completes successfully so we can advise the OS that
-   * we don't need the region to be cached anymore.
-   */
-  public void transferSuccessful() {
-    if (manageOsCache && getCount() > 0) {
-      try {
-        NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
-           fd, getPosition(), getCount(),
-           NativeIO.POSIX.POSIX_FADV_DONTNEED);
-      } catch (Throwable t) {
-        LOG.warn("Failed to manage OS cache for " + identifier, t);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/da0f9387/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
index 5a45917..e358fcc 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
@@ -1,11 +1,7 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *

http://git-wip-us.apache.org/repos/asf/tez/blob/da0f9387/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
index 8cbb8c7..046ce18 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
@@ -33,7 +33,6 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.net.URL;
-import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -54,15 +53,11 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.io.SecureIOUtils;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.MutableCounterInt;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.mapred.FadvisedChunkedFile;
+import org.apache.hadoop.mapred.FadvisedFileRegion;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Shell;
@@ -199,27 +194,6 @@ public class ShuffleHandler {
   private static final AtomicBoolean initing = new AtomicBoolean(false);
   private static ShuffleHandler INSTANCE;
 
-  @Metrics(about="Shuffle output metrics", context="mapred")
-  static class ShuffleMetrics implements ChannelFutureListener {
-    @Metric("Shuffle output in bytes")
-    MutableCounterLong shuffleOutputBytes;
-    @Metric("# of failed shuffle outputs")
-    MutableCounterInt shuffleOutputsFailed;
-    @Metric("# of succeeeded shuffle outputs")
-    MutableCounterInt shuffleOutputsOK;
-    @Metric("# of current shuffle connections")
-    MutableGaugeInt shuffleConnections;
-
-    @Override
-    public void operationComplete(ChannelFuture future) throws Exception {
-      if (future.isSuccess()) {
-        shuffleOutputsOK.incr();
-      } else {
-        shuffleOutputsFailed.incr();
-      }
-      shuffleConnections.decr();
-    }
-  }
 
   public ShuffleHandler(Configuration conf) {
     this.conf = conf;
@@ -299,57 +273,11 @@ public class ShuffleHandler {
   }
 
   public static ShuffleHandler get() {
-    Preconditions.checkState(started.get(), "ShuffleHandler must be started before invoking
started");
+    Preconditions.checkState(started.get(),
+        "ShuffleHandler must be started before invoking started");
     return INSTANCE;
   }
 
-  /**
-   * Serialize the shuffle port into a ByteBuffer for use later on.
-   * @param port the port to be sent to the ApplciationMaster
-   * @return the serialized form of the port.
-   */
-  public static ByteBuffer serializeMetaData(int port) throws IOException {
-    //TODO these bytes should be versioned
-    DataOutputBuffer port_dob = new DataOutputBuffer();
-    port_dob.writeInt(port);
-    return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
-  }
-
-  /**
-   * A helper function to deserialize the metadata returned by ShuffleHandler.
-   * @param meta the metadata returned by the ShuffleHandler
-   * @return the port the Shuffle Handler is listening on to serve shuffle data.
-   */
-  public static int deserializeMetaData(ByteBuffer meta) throws IOException {
-    //TODO this should be returning a class not just an int
-    DataInputByteBuffer in = new DataInputByteBuffer();
-    in.reset(meta);
-    int port = in.readInt();
-    return port;
-  }
-
-  /**
-   * A helper function to serialize the JobTokenIdentifier to be sent to the
-   * ShuffleHandler as ServiceData.
-   * @param jobToken the job token to be used for authentication of
-   * shuffle data requests.
-   * @return the serialized version of the jobToken.
-   */
-  public static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> jobToken)
throws IOException {
-    //TODO these bytes should be versioned
-    DataOutputBuffer jobToken_dob = new DataOutputBuffer();
-    jobToken.write(jobToken_dob);
-    return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength());
-  }
-
-  static Token<JobTokenIdentifier> deserializeServiceData(ByteBuffer secret) throws
IOException {
-    DataInputByteBuffer in = new DataInputByteBuffer();
-    in.reset(secret);
-    Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
-    jt.readFields(in);
-    return jt;
-  }
-
   public int getPort() {
     return port;
   }


Mime
View raw message