tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [1/2] tez git commit: TEZ-2284. Separate TaskReporter into an interface. (sseth)
Date Tue, 07 Apr 2015 20:22:28 GMT
Repository: tez
Updated Branches:
  refs/heads/TEZ-2003 f80e3cc5e -> b3c792b61


TEZ-2284. Separate TaskReporter into an interface. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3bf71c51
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3bf71c51
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3bf71c51

Branch: refs/heads/TEZ-2003
Commit: 3bf71c518c641515c3b8380b66359095e631415a
Parents: f80e3cc
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Apr 7 13:21:35 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Apr 7 13:21:35 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |  1 +
 .../internals/api/TaskReporterInterface.java    | 46 ++++++++++++++++++++
 .../apache/tez/runtime/task/TaskReporter.java   | 20 ++++++---
 .../org/apache/tez/runtime/task/TezChild.java   |  3 +-
 .../apache/tez/runtime/task/TezTaskRunner.java  |  5 ++-
 5 files changed, 66 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3bf71c51/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 6a4399c..e2c428d 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -13,5 +13,6 @@ ALL CHANGES:
   TEZ-2187. Allow TaskCommunicators to report failed / killed attempts.
   TEZ-2241. Miscellaneous fixes after last reabse.
   TEZ-2283. Fixes after rebase 04/07.
+  TEZ-2284. Separate TaskReporter into an interface.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/3bf71c51/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java
new file mode 100644
index 0000000..47a61ab
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.internals.api;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.RuntimeTask;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.task.ErrorReporter;
+
+public interface TaskReporterInterface {
+
+  // TODO TEZ-2003 Consolidate private API usage if making this public
+
+  void registerTask(RuntimeTask task, ErrorReporter errorReporter);
+
+  void unregisterTask(TezTaskAttemptID taskAttemptId);
+
+  boolean taskSucceeded(TezTaskAttemptID taskAttemptId) throws IOException, TezException;
+
+  boolean taskFailed(TezTaskAttemptID taskAttemptId, Throwable cause, String diagnostics,
EventMetaData srcMeta) throws IOException,
+      TezException;
+
+  void addEvents(TezTaskAttemptID taskAttemptId, Collection<TezEvent> events);
+
+  boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException;
+
+  void shutdown();
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/3bf71c51/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index 4c07f5a..4b12d39 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -37,7 +37,7 @@ import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.RuntimeTask;
 import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
@@ -46,6 +46,7 @@ import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
 import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.internals.api.TaskReporterInterface;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,7 +65,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  * retrieve events specific to this task.
  * 
  */
-public class TaskReporter {
+public class TaskReporter implements TaskReporterInterface {
 
   private static final Logger LOG = LoggerFactory.getLogger(TaskReporter.class);
 
@@ -96,7 +97,8 @@ public class TaskReporter {
   /**
    * Register a task to be tracked. Heartbeats will be sent out for this task to fetch events,
etc.
    */
-  public synchronized void registerTask(LogicalIOProcessorRuntimeTask task,
+  @Override
+  public synchronized void registerTask(RuntimeTask task,
       ErrorReporter errorReporter) {
     currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, sendCounterInterval,
         maxEventsToGet, requestCounter, containerIdStr);
@@ -108,11 +110,13 @@ public class TaskReporter {
    * This method should always be invoked before setting up heartbeats for another task running
in
    * the same container.
    */
+  @Override
   public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) {
     currentCallable.markComplete();
     currentCallable = null;
   }
-  
+
+  @Override
   public void shutdown() {
     heartbeatExecutor.shutdownNow();
   }
@@ -123,7 +127,7 @@ public class TaskReporter {
     private static final int LOG_COUNTER_START_INTERVAL = 5000; // 5 seconds
     private static final float LOG_COUNTER_BACKOFF = 1.3f;
 
-    private final LogicalIOProcessorRuntimeTask task;
+    private final RuntimeTask task;
     private EventMetaData updateEventMetadata;
 
     private final TezTaskUmbilicalProtocol umbilical;
@@ -151,7 +155,7 @@ public class TaskReporter {
      */
     private int prevCounterSendHeartbeatNum = 0;
 
-    public HeartbeatCallable(LogicalIOProcessorRuntimeTask task,
+    public HeartbeatCallable(RuntimeTask task,
         TezTaskUmbilicalProtocol umbilical, long amPollInterval, long sendCounterInterval,
         int maxEventsToGet, AtomicLong requestCounter, String containerIdStr) {
 
@@ -374,19 +378,23 @@ public class TaskReporter {
     }
   }
 
+  @Override
   public synchronized boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException,
TezException {
     return currentCallable.taskSucceeded(taskAttemptID);
   }
 
+  @Override
   public synchronized boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String
diagnostics,
       EventMetaData srcMeta) throws IOException, TezException {
     return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta);
   }
 
+  @Override
   public synchronized void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent>
events) {
     currentCallable.addEvents(taskAttemptID, events);
   }
 
+  @Override
   public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException {
     return umbilical.canCommit(taskAttemptID);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/3bf71c51/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 7615f08..c4fd64c 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -66,6 +66,7 @@ import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezUmbilical;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
+import org.apache.tez.runtime.internals.api.TaskReporterInterface;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -111,7 +112,7 @@ public class TezChild {
   private final boolean ownUmbilical;
 
   private final TezTaskUmbilicalProtocol umbilical;
-  private TaskReporter taskReporter;
+  private TaskReporterInterface taskReporter;
   private int taskCount = 0;
   private TezVertexID lastVertexID;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/3bf71c51/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
index 84ebffe..cf261d7 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -41,6 +41,7 @@ import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.apache.tez.runtime.internals.api.TaskReporterInterface;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,7 +57,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
   private final LogicalIOProcessorRuntimeTask task;
   private final UserGroupInformation ugi;
 
-  private final TaskReporter taskReporter;
+  private final TaskReporterInterface taskReporter;
   private final ListeningExecutorService executor;
   private volatile ListenableFuture<Void> taskFuture;
   private volatile Thread waitingThread;
@@ -70,7 +71,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
   public TezTaskRunner(Configuration tezConf, UserGroupInformation ugi, String[] localDirs,
       TaskSpec taskSpec, int appAttemptNumber,
       Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> serviceProviderEnvMap,
-      Multimap<String, String> startedInputsMap, TaskReporter taskReporter,
+      Multimap<String, String> startedInputsMap, TaskReporterInterface taskReporter,
       ListeningExecutorService executor, ObjectRegistry objectRegistry, String pid,
       ExecutionContext executionContext, long memAvailable)
           throws IOException {


Mime
View raw message