Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BCE361888F for ; Thu, 6 Aug 2015 09:25:57 +0000 (UTC) Received: (qmail 54173 invoked by uid 500); 6 Aug 2015 09:25:54 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 54067 invoked by uid 500); 6 Aug 2015 09:25:54 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 52894 invoked by uid 99); 6 Aug 2015 09:25:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Aug 2015 09:25:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C576DE6824; Thu, 6 Aug 2015 09:25:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Thu, 06 Aug 2015 09:26:10 -0000 Message-Id: <0b0cdce397574babb28c23d6b29ccd35@git.apache.org> In-Reply-To: <10a0859464864279b3a9f48415e4a976@git.apache.org> References: <10a0859464864279b3a9f48415e4a976@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [18/51] [abbrv] tez git commit: TEZ-2284. Separate TaskReporter into an interface. (sseth) TEZ-2284. Separate TaskReporter into an interface. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/08a196a1 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/08a196a1 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/08a196a1 Branch: refs/heads/TEZ-2003 Commit: 08a196a17f409024f410bb5c108bb1ead0099fb0 Parents: 1f75f32 Author: Siddharth Seth Authored: Tue Apr 7 13:21:35 2015 -0700 Committer: Siddharth Seth Committed: Thu Aug 6 01:25:10 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../internals/api/TaskReporterInterface.java | 46 ++++++++++++++++++++ .../apache/tez/runtime/task/TaskReporter.java | 12 ++++- .../org/apache/tez/runtime/task/TezChild.java | 3 +- .../apache/tez/runtime/task/TezTaskRunner.java | 5 ++- 5 files changed, 62 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/08a196a1/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 6a4399c..e2c428d 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -13,5 +13,6 @@ ALL CHANGES: TEZ-2187. Allow TaskCommunicators to report failed / killed attempts. TEZ-2241. Miscellaneous fixes after last reabse. TEZ-2283. Fixes after rebase 04/07. + TEZ-2284. Separate TaskReporter into an interface. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/08a196a1/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java new file mode 100644 index 0000000..47a61ab --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.internals.api; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.RuntimeTask; +import org.apache.tez.runtime.api.impl.EventMetaData; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.runtime.task.ErrorReporter; + +public interface TaskReporterInterface { + + // TODO TEZ-2003 Consolidate private API usage if making this public + + void registerTask(RuntimeTask task, ErrorReporter errorReporter); + + void unregisterTask(TezTaskAttemptID taskAttemptId); + + boolean taskSucceeded(TezTaskAttemptID taskAttemptId) throws IOException, TezException; + + boolean taskFailed(TezTaskAttemptID taskAttemptId, Throwable cause, String diagnostics, EventMetaData srcMeta) throws IOException, + TezException; + + void addEvents(TezTaskAttemptID taskAttemptId, Collection events); + + boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException; + + void shutdown(); + +} http://git-wip-us.apache.org/repos/asf/tez/blob/08a196a1/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java index d9a7786..3579e3f 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java @@ -48,6 +48,7 @@ import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; +import org.apache.tez.runtime.internals.api.TaskReporterInterface; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +67,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * retrieve events specific to this task. * */ -public class TaskReporter { +public class TaskReporter implements TaskReporterInterface { private static final Logger LOG = LoggerFactory.getLogger(TaskReporter.class); @@ -98,6 +99,7 @@ public class TaskReporter { /** * Register a task to be tracked. Heartbeats will be sent out for this task to fetch events, etc. */ + @Override public synchronized void registerTask(RuntimeTask task, ErrorReporter errorReporter) { currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, sendCounterInterval, @@ -110,12 +112,14 @@ public class TaskReporter { * This method should always be invoked before setting up heartbeats for another task running in * the same container. */ + @Override public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) { currentCallable.markComplete(); currentCallable = null; // KKK Make sure the callable completes before proceeding } - + + @Override public void shutdown() { heartbeatExecutor.shutdownNow(); } @@ -413,19 +417,23 @@ public class TaskReporter { } } + @Override public synchronized boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException { return currentCallable.taskSucceeded(taskAttemptID); } + @Override public synchronized boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, EventMetaData srcMeta) throws IOException, TezException { return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta); } + @Override public synchronized void addEvents(TezTaskAttemptID taskAttemptID, Collection events) { currentCallable.addEvents(taskAttemptID, events); } + @Override public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException { return umbilical.canCommit(taskAttemptID); } http://git-wip-us.apache.org/repos/asf/tez/blob/08a196a1/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 32da8fb..4c8bebc 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -69,6 +69,7 @@ import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezUmbilical; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; +import org.apache.tez.runtime.internals.api.TaskReporterInterface; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,7 +115,7 @@ public class TezChild { private final boolean ownUmbilical; private final TezTaskUmbilicalProtocol umbilical; - private TaskReporter taskReporter; + private TaskReporterInterface taskReporter; private int taskCount = 0; private TezVertexID lastVertexID; http://git-wip-us.apache.org/repos/asf/tez/blob/08a196a1/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java index f54814b..33a7f4a 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java @@ -41,6 +41,7 @@ import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.TezUmbilical; +import org.apache.tez.runtime.internals.api.TaskReporterInterface; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +57,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { private final LogicalIOProcessorRuntimeTask task; private final UserGroupInformation ugi; - private final TaskReporter taskReporter; + private final TaskReporterInterface taskReporter; private final ListeningExecutorService executor; private volatile ListenableFuture taskFuture; private volatile Thread waitingThread; @@ -70,7 +71,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { public TezTaskRunner(Configuration tezConf, UserGroupInformation ugi, String[] localDirs, TaskSpec taskSpec, int appAttemptNumber, Map serviceConsumerMetadata, Map serviceProviderEnvMap, - Multimap startedInputsMap, TaskReporter taskReporter, + Multimap startedInputsMap, TaskReporterInterface taskReporter, ListeningExecutorService executor, ObjectRegistry objectRegistry, String pid, ExecutionContext executionContext, long memAvailable) throws IOException {