Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0C1C718A21 for ; Sat, 9 May 2015 01:46:33 +0000 (UTC) Received: (qmail 29936 invoked by uid 500); 9 May 2015 01:46:32 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 29892 invoked by uid 500); 9 May 2015 01:46:32 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 29880 invoked by uid 99); 9 May 2015 01:46:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 09 May 2015 01:46:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AA31CE10A4; Sat, 9 May 2015 01:46:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@hive.apache.org Message-Id: <30533c1ae90d477bb1663e91a7664bbe@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-10663. LLAP: Update tez dependency. (Siddharth Seth) Date: Sat, 9 May 2015 01:46:32 +0000 (UTC) Repository: hive Updated Branches: refs/heads/llap 20ac70b15 -> dc7ceb4e2 HIVE-10663. LLAP: Update tez dependency. (Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/dc7ceb4e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/dc7ceb4e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/dc7ceb4e Branch: refs/heads/llap Commit: dc7ceb4e2fa81592b35b3a67d4d2bbea22a14ea4 Parents: 20ac70b Author: Siddharth Seth Authored: Fri May 8 18:46:09 2015 -0700 Committer: Siddharth Seth Committed: Fri May 8 18:46:09 2015 -0700 ---------------------------------------------------------------------- llap-client/pom.xml | 2 +- llap-server/pom.xml | 2 +- .../hive/llap/daemon/impl/LlapTaskReporter.java | 53 ++++++++++++++------ pom.xml | 2 +- 4 files changed, 42 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/dc7ceb4e/llap-client/pom.xml ---------------------------------------------------------------------- diff --git a/llap-client/pom.xml b/llap-client/pom.xml index 8a1a8bd..87f53b7 100644 --- a/llap-client/pom.xml +++ b/llap-client/pom.xml @@ -19,7 +19,7 @@ org.apache.hive hive - 1.2.0-SNAPSHOT + 1.3.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/hive/blob/dc7ceb4e/llap-server/pom.xml ---------------------------------------------------------------------- diff --git a/llap-server/pom.xml b/llap-server/pom.xml index 9325bd9..4fcd705 100644 --- a/llap-server/pom.xml +++ b/llap-server/pom.xml @@ -19,7 +19,7 @@ org.apache.hive hive - 1.2.0-SNAPSHOT + 1.3.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/hive/blob/dc7ceb4e/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java index f3771ea..716fb23 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; @@ -130,7 +131,7 @@ public class LlapTaskReporter implements TaskReporterInterface { private static final float LOG_COUNTER_BACKOFF = 1.3f; private final RuntimeTask task; - private EventMetaData updateEventMetadata; + private final EventMetaData updateEventMetadata; private final LlapTaskUmbilicalProtocol umbilical; @@ -141,6 +142,9 @@ public class LlapTaskReporter implements TaskReporterInterface { private final AtomicLong requestCounter; + private final AtomicBoolean finalEventQueued = new AtomicBoolean(false); + private final AtomicBoolean askedToDie = new AtomicBoolean(false); + private LinkedBlockingQueue eventsToSend = new LinkedBlockingQueue(); private final ReentrantLock lock = new ReentrantLock(); @@ -204,6 +208,9 @@ public class LlapTaskReporter implements TaskReporterInterface { } int pendingEventCount = eventsToSend.size(); if (pendingEventCount > 0) { + // This is OK because the pending events will be sent via the succeeded/failed messages. + // TaskDone is set before taskSucceeded/taskFailed are sent out - which is what causes the + // thread to exit LOG.warn("Exiting TaskReporter thread with pending queue size=" + pendingEventCount); } return true; @@ -245,8 +252,9 @@ public class LlapTaskReporter implements TaskReporterInterface { } long requestId = requestCounter.incrementAndGet(); + int fromEventId = task.getNextFromEventId(); TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, containerIdStr, - task.getTaskAttemptID(), task.getEventCounter(), maxEventsToGet); + task.getTaskAttemptID(), fromEventId, maxEventsToGet); if (LOG.isDebugEnabled()) { LOG.debug("Sending heartbeat to AM, request=" + request); } @@ -260,6 +268,7 @@ public class LlapTaskReporter implements TaskReporterInterface { if (response.shouldDie()) { LOG.info("Received should die response from AM"); + askedToDie.set(true); return new ResponseWrapper(true, 1); } if (response.getLastRequestId() != requestId) { @@ -276,10 +285,13 @@ public class LlapTaskReporter implements TaskReporterInterface { + " heartbeat response, eventCount=" + response.getEvents().size()); } } else { + task.setNextFromEventId(response.getNextFromEventId()); if (response.getEvents() != null && !response.getEvents().isEmpty()) { if (LOG.isDebugEnabled()) { LOG.debug("Routing events from heartbeat response to task" + ", currentTaskAttemptId=" - + task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size()); + + task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size() + + " fromEventId=" + fromEventId + + " nextFromEventId=" + response.getNextFromEventId()); } // This should ideally happen in a separate thread numEventsReceived = response.getEvents().size(); @@ -318,10 +330,16 @@ public class LlapTaskReporter implements TaskReporterInterface { * indicates an exception somewhere in the AM. */ private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException { - TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata); - TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(), - updateEventMetadata); - return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie; + // Ensure only one final event is ever sent. + if (!finalEventQueued.getAndSet(true)) { + TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata); + TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(), + updateEventMetadata); + return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie; + } else { + LOG.warn("A final task state event has already been sent. Not sending again"); + return askedToDie.get(); + } } private TaskStatusUpdateEvent getStatusUpdateEvent(boolean sendCounters) { @@ -353,15 +371,22 @@ public class LlapTaskReporter implements TaskReporterInterface { */ private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, EventMetaData srcMeta) throws IOException, TezException { - TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata); - if (diagnostics == null) { - diagnostics = ExceptionUtils.getStackTrace(t); + // Ensure only one final event is ever sent. + if (!finalEventQueued.getAndSet(true)) { + TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata); + if (diagnostics == null) { + diagnostics = ExceptionUtils.getStackTrace(t); + } else { + diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t); + } + TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics), + srcMeta == null ? updateEventMetadata : srcMeta); + return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie; } else { - diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t); + LOG.warn("A final task state event has already been sent. Not sending again"); + return askedToDie.get(); } - TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics), - srcMeta == null ? updateEventMetadata : srcMeta); - return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie; + } private void addEvents(TezTaskAttemptID taskAttemptID, Collection events) { http://git-wip-us.apache.org/repos/asf/hive/blob/dc7ceb4e/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d4eb8e5..60a18c3 100644 --- a/pom.xml +++ b/pom.xml @@ -156,7 +156,7 @@ 1.0.1 1.7.5 4.0.4 - 0.7.0-TEZ-2003-SNAPSHOT + 0.8.0-TEZ-2003-SNAPSHOT 2.2.0 1.3.0 2.10