tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-716. Remove some unnecessary classes. (sseth)
Date Thu, 09 Jan 2014 18:04:03 GMT
Updated Branches:
  refs/heads/master 2c8ee6cc4 -> 74c622591


TEZ-716. Remove some unnecessary classes. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/74c62259
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/74c62259
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/74c62259

Branch: refs/heads/master
Commit: 74c6225917e4fd1f298dce5d9a947e9c98dc0756
Parents: 2c8ee6c
Author: Siddharth Seth <sseth@apache.org>
Authored: Thu Jan 9 10:03:45 2014 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Thu Jan 9 10:03:45 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/tez/common/IDUtils.java     |  92 --------------
 .../records/ProceedToCompletionResponse.java    |  76 ------------
 .../hadoop/mapred/TezMRTypeConverter.java       |  42 -------
 .../org/apache/hadoop/mapred/WrappedJvmID.java  |  30 -----
 .../mapred/WrappedPeriodicStatsAccumulator.java |  33 -----
 .../mapred/WrappedProgressSplitsBlock.java      |  67 -----------
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  13 --
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  53 ---------
 .../app/taskclean/TaskAttemptCleanupEvent.java  |  69 -----------
 .../tez/dag/app/taskclean/TaskCleaner.java      |  30 -----
 .../TaskCleanerContainerCompletedEvent.java     |  36 ------
 .../tez/dag/app/taskclean/TaskCleanerImpl.java  | 119 -------------------
 .../tez/dag/app/taskclean/TaskCleanupEvent.java |  58 ---------
 .../tez/dag/app/taskclean/package-info.java     |  20 ----
 .../tez/common/TezTaskUmbilicalProtocol.java    |   5 -
 .../common/shuffle/impl/MergeManager.java       |   2 +
 16 files changed, 2 insertions(+), 743 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74c62259/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/IDUtils.java b/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
deleted file mode 100644
index d15b0d3..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.common;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.records.TezID;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-
-public class IDUtils {
-
-  /** Construct a TaskID object from given string 
-   * @return constructed TaskID object or null if the given String is null
-   * @throws IllegalArgumentException if the given string is malformed
-   */
-  public static TezTaskID toTaskId(String str) 
-    throws IllegalArgumentException {
-    if(str == null)
-      return null;
-    String exceptionMsg = null;
-    try {
-      String[] parts = str.split("_");
-      if(parts.length == 6) {
-        if(parts[0].equals(TezTaskID.TASK)) {
-          ApplicationId appId = ApplicationId.newInstance(
-              Long.valueOf(parts[1]), Integer.parseInt(parts[2]));
-          TezDAGID dagId = TezDAGID.getInstance(appId, Integer.parseInt(parts[3]));
-          TezVertexID vId = TezVertexID.getInstance(dagId, Integer.parseInt(parts[4]));
-          return TezTaskID.getInstance(vId, Integer.parseInt(parts[5]));
-        } else
-          exceptionMsg = "Bad TaskType identifier. TaskId string : " + str
-          + " is not properly formed.";
-      }
-    }catch (Exception ex) {//fall below
-    }
-    if (exceptionMsg == null) {
-      exceptionMsg = "TaskId string : " + str + " is not properly formed";
-    }
-    throw new IllegalArgumentException(exceptionMsg);
-  }
-  
-  /** Construct a TaskAttemptID object from given string 
-   * @return constructed TaskAttemptID object or null if the given String is null
-   * @throws IllegalArgumentException if the given string is malformed
-   */
-  public static TezTaskAttemptID toTaskAttemptId(String str
-                                      ) throws IllegalArgumentException {
-    if(str == null)
-      return null;
-    String exceptionMsg = null;
-    try {
-      String[] parts = str.split(Character.toString(TezID.SEPARATOR));
-      if(parts.length == 7) {
-        if(parts[0].equals(TezTaskAttemptID.ATTEMPT)) {
-          ApplicationId appId = ApplicationId.newInstance(
-              Long.valueOf(parts[1]), Integer.parseInt(parts[2]));
-          TezDAGID dagId = TezDAGID.getInstance(appId, Integer.parseInt(parts[3]));
-          TezVertexID vId = TezVertexID.getInstance(dagId, Integer.parseInt(parts[4]));
-          TezTaskID tId = TezTaskID.getInstance(vId, Integer.parseInt(parts[5]));
-          return TezTaskAttemptID.getInstance(tId, Integer.parseInt(parts[6]));
-        } else
-          exceptionMsg = "Bad TaskType identifier. TaskAttemptId string : "
-              + str + " is not properly formed.";
-      }
-    } catch (Exception ex) {
-      //fall below
-    }
-    if (exceptionMsg == null) {
-      exceptionMsg = "TaskAttemptId string : " + str
-          + " is not properly formed";
-    }
-    throw new IllegalArgumentException(exceptionMsg);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74c62259/tez-common/src/main/java/org/apache/tez/common/records/ProceedToCompletionResponse.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/records/ProceedToCompletionResponse.java
b/tez-common/src/main/java/org/apache/tez/common/records/ProceedToCompletionResponse.java
deleted file mode 100644
index c6fdeca..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/records/ProceedToCompletionResponse.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.tez.common.records;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-
-
-public class ProceedToCompletionResponse implements Writable{
-
-  private boolean shouldDie;
-  private boolean readyToProceed;
-
-  public ProceedToCompletionResponse() {
-  }
-  
-  public ProceedToCompletionResponse(boolean shouldDie, boolean readyToProceed) {
-    this.shouldDie = shouldDie;
-    this.readyToProceed = readyToProceed;
-  }
-
-  /**
-   * Indicates whether the task is required to proceed to completion, or should
-   * terminate.
-   * 
-   * @return
-   */
-  public boolean shouldDie() {
-    return this.shouldDie;
-  }
-  
-  /**
-   * Indicates whether the task is ready to proceed. Valid only if shouldDie is
-   * false.
-   * 
-   * @return
-   */
-  public boolean readyToProceed() {
-    return this.readyToProceed;
-  }
-  
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeBoolean(shouldDie);
-    out.writeBoolean(readyToProceed);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    shouldDie = in.readBoolean();
-    readyToProceed = in.readBoolean();
-  }
-
-  @Override
-  public String toString() {
-    return "shouldDie: " + shouldDie + ", readyToProceed: " + readyToProceed;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74c62259/tez-dag/src/main/java/org/apache/hadoop/mapred/TezMRTypeConverter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/TezMRTypeConverter.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/TezMRTypeConverter.java
deleted file mode 100644
index a47ef40..0000000
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/TezMRTypeConverter.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-
-public class TezMRTypeConverter {
-
-  // FIXME hack alert assumimg only one dag per application
-  public static TaskAttemptID fromTez(TezTaskAttemptID attemptId) {
-    TezTaskID taskId = attemptId.getTaskID();
-    TezVertexID vertexId = taskId.getVertexID();
-    TezDAGID dagId = vertexId.getDAGId();
-    return new TaskAttemptID(
-        Long.toString(dagId.getApplicationId().getClusterTimestamp()),
-        dagId.getApplicationId().getId(),
-        (vertexId.getId() == 0 ? TaskType.MAP : TaskType.REDUCE),
-        taskId.getId(), attemptId.getId());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74c62259/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedJvmID.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedJvmID.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedJvmID.java
deleted file mode 100644
index 2a83a26..0000000
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedJvmID.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.hadoop.mapred;
-
-/**
- * A simple wrapper for increasing the visibility.
- */
-public class WrappedJvmID extends JVMId {
-
-  public WrappedJvmID(JobID jobID, boolean mapTask, int nextInt) {
-    super(jobID, mapTask, nextInt);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74c62259/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedPeriodicStatsAccumulator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedPeriodicStatsAccumulator.java
b/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedPeriodicStatsAccumulator.java
deleted file mode 100644
index e55c0ad..0000000
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedPeriodicStatsAccumulator.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-//Workaround for PeriodicStateAccumulator being package access
-public class WrappedPeriodicStatsAccumulator {
-
-  private PeriodicStatsAccumulator real;
-
-  public WrappedPeriodicStatsAccumulator(PeriodicStatsAccumulator real) {
-    this.real = real;
-  }
-  
-  public void extend(double newProgress, int newValue) {
-    real.extend(newProgress, newValue);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74c62259/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedProgressSplitsBlock.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedProgressSplitsBlock.java
b/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedProgressSplitsBlock.java
deleted file mode 100644
index f1a63f2..0000000
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/WrappedProgressSplitsBlock.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-// Workaround for ProgressSplitBlock being package access
-public class WrappedProgressSplitsBlock extends ProgressSplitsBlock {
-  private WrappedPeriodicStatsAccumulator wrappedProgressWallclockTime;
-  private WrappedPeriodicStatsAccumulator wrappedProgressCPUTime;
-  private WrappedPeriodicStatsAccumulator wrappedProgressVirtualMemoryKbytes;
-  private WrappedPeriodicStatsAccumulator wrappedProgressPhysicalMemoryKbytes;
-
-  public WrappedProgressSplitsBlock(int numberSplits) {
-    super(numberSplits);
-  }
-
-  public int[][] burst() {
-    return super.burst();
-  }
-
-  public WrappedPeriodicStatsAccumulator getProgressWallclockTime() {
-    if (wrappedProgressWallclockTime == null) {
-      wrappedProgressWallclockTime = new WrappedPeriodicStatsAccumulator(
-          progressWallclockTime);
-    }
-    return wrappedProgressWallclockTime;
-  }
-
-  public WrappedPeriodicStatsAccumulator getProgressCPUTime() {
-    if (wrappedProgressCPUTime == null) {
-      wrappedProgressCPUTime = new WrappedPeriodicStatsAccumulator(
-          progressCPUTime);
-    }
-    return wrappedProgressCPUTime;
-  }
-
-  public WrappedPeriodicStatsAccumulator getProgressVirtualMemoryKbytes() {
-    if (wrappedProgressVirtualMemoryKbytes == null) {
-      wrappedProgressVirtualMemoryKbytes = new WrappedPeriodicStatsAccumulator(
-          progressVirtualMemoryKbytes);
-    }
-    return wrappedProgressVirtualMemoryKbytes;
-  }
-
-  public WrappedPeriodicStatsAccumulator getProgressPhysicalMemoryKbytes() {
-    if (wrappedProgressPhysicalMemoryKbytes == null) {
-      wrappedProgressPhysicalMemoryKbytes = new WrappedPeriodicStatsAccumulator(
-          progressPhysicalMemoryKbytes);
-    }
-    return wrappedProgressPhysicalMemoryKbytes;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74c62259/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 2147f9f..a072b5b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -120,8 +120,6 @@ import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
 import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
 import org.apache.tez.dag.app.rm.node.AMNodeEventType;
 import org.apache.tez.dag.app.rm.node.AMNodeMap;
-import org.apache.tez.dag.app.taskclean.TaskCleaner;
-import org.apache.tez.dag.app.taskclean.TaskCleanerImpl;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.history.avro.HistoryEventType;
@@ -177,7 +175,6 @@ public class DAGAppMaster extends AbstractService {
   private Configuration amConf;
   private Dispatcher dispatcher;
   private ContainerLauncher containerLauncher;
-  private TaskCleaner taskCleaner;
   private ContainerHeartbeatHandler containerHeartbeatHandler;
   private TaskHeartbeatHandler taskHeartbeatHandler;
   private TaskAttemptListener taskAttemptListener;
@@ -296,10 +293,6 @@ public class DAGAppMaster extends AbstractService {
     addIfService(nodes, true);
     dispatcher.register(AMNodeEventType.class, nodes);
 
-    //service to do the task cleanup
-    taskCleaner = createTaskCleaner(context);
-    addIfService(taskCleaner, true);
-
     this.dagEventDispatcher = new DagEventDispatcher();
     this.vertexEventDispatcher = new VertexEventDispatcher();
 
@@ -310,7 +303,6 @@ public class DAGAppMaster extends AbstractService {
     dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
     dispatcher.register(TaskAttemptEventType.class,
         new TaskAttemptEventDispatcher());
-    dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
 
     taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
         clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher);
@@ -557,11 +549,6 @@ public class DAGAppMaster extends AbstractService {
     return chh;
   }
 
-
-  protected TaskCleaner createTaskCleaner(AppContext context) {
-    return new TaskCleanerImpl(context);
-  }
-
   protected ContainerLauncher
       createContainerLauncher(final AppContext context) {
     return new ContainerLauncherImpl(context);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74c62259/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 51104a8..9e39349 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -46,7 +46,6 @@ import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.TezConverterUtils;
 import org.apache.tez.common.TezLocalResource;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.common.records.ProceedToCompletionResponse;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Task;
@@ -71,9 +70,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   private static final ContainerTask TASK_FOR_INVALID_JVM = new ContainerTask(
       null, true, null);
 
-  private static ProceedToCompletionResponse COMPLETION_RESPONSE_NO_WAIT =
-      new ProceedToCompletionResponse(true, true);
-
   private static final Log LOG = LogFactory
       .getLog(TaskAttemptListenerImpTezDag.class);
 
@@ -355,55 +351,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   }
 
   @Override
-  public ProceedToCompletionResponse
-      proceedToCompletion(TezTaskAttemptID taskAttemptId) throws IOException {
-
-    // The async nature of the processing combined with the 1 second interval
-    // between polls (MRTask) implies tasks end up wasting upto 1 second doing
-    // nothing. Similarly for CA_COMMIT.
-
-    /*
-    DAG job = context.getCurrentDAG();
-    Task task =
-        job.getVertex(taskAttemptId.getTaskID().getVertexID()).
-            getTask(taskAttemptId.getTaskID());
-
-    // TODO In-Memory Shuffle
-    if (task.needsWaitAfterOutputConsumable()) {
-      TezTaskAttemptID outputReadyAttempt = task.getOutputConsumableAttempt();
-      if (outputReadyAttempt != null) {
-        if (!outputReadyAttempt.equals(taskAttemptId)) {
-          LOG.info("Telling taksAttemptId: "
-              + taskAttemptId
-              + " to die, since the outputReady atempt for this task is different: "
-              + outputReadyAttempt);
-          return new ProceedToCompletionResponse(true, true);
-        }
-      }
-      boolean reducesDone = true;
-      for (Task rTask : job.getTasks(TaskType.REDUCE).values()) {
-        if (rTask.getState() != TaskState.SUCCEEDED) {
-          // TODO EVENTUALLY - could let the map tasks exit after reduces are
-          // done with the shuffle phase, instead of waiting for the reduces to
-          // complete.
-          reducesDone = false;
-          break;
-        }
-      }
-      if (reducesDone) {
-        return new ProceedToCompletionResponse(false, true);
-      } else {
-        return new ProceedToCompletionResponse(false, false);
-      }
-    } else {
-      return COMPLETION_RESPONSE_NO_WAIT;
-    }
-    */
-    return COMPLETION_RESPONSE_NO_WAIT;
-  }
-
-
-  @Override
   public void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
     ContainerId containerId = attemptToInfoMap.get(attemptId);
     if(containerId == null) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74c62259/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskAttemptCleanupEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskAttemptCleanupEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskAttemptCleanupEvent.java
deleted file mode 100644
index 8f718dc..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskAttemptCleanupEvent.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.taskclean;
-
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.event.AbstractEvent;
-
-/**
- * This class encapsulates task cleanup event.
- * 
- */
-public class TaskAttemptCleanupEvent extends
-    AbstractEvent<TaskCleaner.EventType> {
-
-  private final TaskAttemptId attemptID;
-  private final OutputCommitter committer;
-  private final TaskAttemptContext attemptContext;
-  private final ContainerId containerId;
-
-  public TaskAttemptCleanupEvent(TaskAttemptId attemptID,
-      ContainerId containerId, OutputCommitter committer,
-      TaskAttemptContext attemptContext) {
-    super(TaskCleaner.EventType.TASK_CLEAN);
-    this.attemptID = attemptID;
-    this.containerId = containerId;
-    this.committer = committer;
-    this.attemptContext = attemptContext;
-  }
-
-  public TaskAttemptId getAttemptID() {
-    return attemptID;
-  }
-
-  public OutputCommitter getCommitter() {
-    return committer;
-  }
-
-  public TaskAttemptContext getAttemptContext() {
-    return attemptContext;
-  }
-
-  /**
-   * containerId could be null if the container task attempt had not started.
-   * @return
-   */
-  public ContainerId getContainerId() {
-    return containerId;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74c62259/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleaner.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleaner.java b/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleaner.java
deleted file mode 100644
index 39d8e4c..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleaner.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.taskclean;
-
-import org.apache.hadoop.yarn.event.EventHandler;
-
-public interface TaskCleaner extends EventHandler<TaskCleanupEvent> {
-
-  enum EventType {
-    // TODO XXX Rename this event once the code is more stable.
-    TASK_CLEAN,
-    CONTAINER_COMPLETED,
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74c62259/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanerContainerCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanerContainerCompletedEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanerContainerCompletedEvent.java
deleted file mode 100644
index 71f5169..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanerContainerCompletedEvent.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.taskclean;
-
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.event.AbstractEvent;
-
-public class TaskCleanerContainerCompletedEvent extends AbstractEvent<TaskCleaner.EventType>
{
-
-  private ContainerId containerId;
-  
-  public TaskCleanerContainerCompletedEvent(ContainerId containerId) {
-    super(TaskCleaner.EventType.CONTAINER_COMPLETED);
-    this.containerId = containerId;
-  }
-
-  public ContainerId getContainerId() {
-    return this.containerId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74c62259/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanerImpl.java
deleted file mode 100644
index e8cb7b4..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanerImpl.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.taskclean;
-
-import java.util.Iterator;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.AppContext;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class TaskCleanerImpl extends AbstractService implements TaskCleaner {
-
-  private static final Log LOG = LogFactory.getLog(TaskCleanerImpl.class);
-
-  private final AppContext context;
-  private ThreadPoolExecutor launcherPool;
-  private Thread eventHandlingThread;
-  private BlockingQueue<TaskCleanupEvent> eventQueue =
-      new LinkedBlockingQueue<TaskCleanupEvent>();
-
-  public TaskCleanerImpl(AppContext context) {
-    super("TaskCleaner");
-    this.context = context;
-  }
-
-  public void serviceStart() {
-    ThreadFactory tf = new ThreadFactoryBuilder()
-      .setNameFormat("TaskCleaner #%d")
-      .build();
-    launcherPool = new ThreadPoolExecutor(5, 5, 1, 
-        TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
-    eventHandlingThread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        TaskCleanupEvent event = null;
-        while (!Thread.currentThread().isInterrupted()) {
-          try {
-            event = eventQueue.take();
-          } catch (InterruptedException e) {
-            LOG.error("Returning, interrupted : " + e);
-            return;
-          }
-          // the events from the queue are handled in parallel
-          // using a thread pool
-          launcherPool.execute(new EventProcessor(event));        }
-      }
-    });
-    eventHandlingThread.setName("TaskCleaner Event Handler");
-    eventHandlingThread.start();
-  }
-
-  public void serviceStop() {
-    if (eventHandlingThread != null) {
-      eventHandlingThread.interrupt();
-    }
-    if (launcherPool != null) {
-      launcherPool.shutdown();
-    }
-    Iterator<TaskCleanupEvent> it = eventQueue.iterator();
-    while (it.hasNext()) {
-      TaskCleanupEvent ev = it.next();
-      LOG.info("TaskCleaner.stop: Cleanup for: " + ev.getAttemptID());
-      new EventProcessor(ev).run();
-    }
-  }
-
-  private class EventProcessor implements Runnable {
-    private TaskCleanupEvent event;
-
-    EventProcessor(TaskCleanupEvent event) {
-      this.event = event;
-    }
-
-    @Override
-    public void run() {
-      LOG.info("Processing the event " + event.toString());
-      try {
-        event.getCommitter().abortTask(event.getAttemptContext());
-      } catch (Exception e) {
-        LOG.warn("Task cleanup failed for attempt " + event.getAttemptID(), e);
-      }
-    }
-  }
-
-  @Override
-  public void handle(TaskCleanupEvent event) {
-    try {
-      eventQueue.put(event);
-    } catch (InterruptedException e) {
-      throw new TezUncheckedException(e);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74c62259/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanupEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanupEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanupEvent.java
deleted file mode 100644
index 9b6cc15..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanupEvent.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.taskclean;
-
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-/**
- * This class encapsulates task cleanup event.
- *
- */
-public class TaskCleanupEvent extends AbstractEvent<TaskCleaner.EventType> {
-  // TODO XXX: Rename to TaskAttemptCleanupEvent ?
-
-  // TODO XXX: Maybe include the containerId along with this event. Otherwise depend on events
coming in from the Container to include MRxTaskAttemptIDs.
-  private final TezTaskAttemptID attemptID;
-  private final OutputCommitter committer;
-  private final TaskAttemptContext attemptContext;
-
-  public TaskCleanupEvent(TezTaskAttemptID attemptID, OutputCommitter committer, 
-      TaskAttemptContext attemptContext) {
-    super(TaskCleaner.EventType.TASK_CLEAN);
-    this.attemptID = attemptID;
-    this.committer = committer;
-    this.attemptContext = attemptContext;
-  }
-
-  public TezTaskAttemptID getAttemptID() {
-    return attemptID;
-  }
-
-  public OutputCommitter getCommitter() {
-    return committer;
-  }
-
-  public TaskAttemptContext getAttemptContext() {
-    return attemptContext;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74c62259/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/package-info.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/package-info.java b/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/package-info.java
deleted file mode 100644
index 3e1dae6..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-@InterfaceAudience.Private
-package org.apache.tez.dag.app.taskclean;
-import org.apache.hadoop.classification.InterfaceAudience;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74c62259/tez-runtime-internals/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
b/tez-runtime-internals/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
index 0c55e8b..e3bd589 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
@@ -22,10 +22,8 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.ipc.ProtocolInfo;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.token.TokenInfo;
-import org.apache.tez.common.records.ProceedToCompletionResponse;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
@@ -49,9 +47,6 @@ public interface TezTaskUmbilicalProtocol extends VersionedProtocol {
 
   boolean canCommit(TezTaskAttemptID taskid) throws IOException;
 
-  ProceedToCompletionResponse
-      proceedToCompletion(TezTaskAttemptID taskAttemptId) throws IOException;
-
   /// Copies from TezUmbilical until complete re-factor is done
   // TODONEWTEZ
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74c62259/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
index 27ebd5d..d959c02 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
@@ -674,6 +674,8 @@ public class MergeManager {
     }
     int maxInMemReduce = (int)Math.min(
         Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
+    LOG.info("Memory allocated for final merge output: " + maxInMemReduce + ", using factor:
"
+        + maxRedPer);
     
 
     // merge config params


Mime
View raw message