Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2E593184BD for ; Mon, 8 Jun 2015 19:07:27 +0000 (UTC) Received: (qmail 50009 invoked by uid 500); 8 Jun 2015 19:07:27 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 49965 invoked by uid 500); 8 Jun 2015 19:07:27 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 49952 invoked by uid 99); 8 Jun 2015 19:07:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Jun 2015 19:07:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DDB44DFFD5; Mon, 8 Jun 2015 19:07:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: venki@apache.org To: commits@drill.apache.org Date: Mon, 08 Jun 2015 19:07:26 -0000 Message-Id: <2f6ae2c9fbce47c2adfa7f5c6080ba2c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/5] drill git commit: DRILL-3190: Check for transitions from CANCELLATION_REQUESTED state + Moved state transition checks to QueryManager Repository: drill Updated Branches: refs/heads/master 1de6aed93 -> 6796006f2 DRILL-3190: Check for transitions from CANCELLATION_REQUESTED state + Moved state transition checks to QueryManager Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d9452d97 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d9452d97 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d9452d97 Branch: refs/heads/master Commit: d9452d973e1bfd7f0aa2e6c28b3e8c72817628a3 Parents: 1de6aed Author: Sudheesh Katkam Authored: Mon Jun 1 09:42:33 2015 -0700 Committer: vkorukanti Committed: Fri Jun 5 07:47:09 2015 -0700 ---------------------------------------------------------------------- .../drill/exec/work/foreman/FragmentData.java | 28 +++++--------------- .../drill/exec/work/foreman/QueryManager.java | 25 ++++++++++++++--- 2 files changed, 27 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/d9452d97/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java index ceb77f0..9e07210 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java @@ -23,10 +23,9 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.UserBitShared.FragmentState; import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile; import org.apache.drill.exec.proto.UserBitShared.OperatorProfile; -import org.apache.drill.exec.proto.helper.QueryIdHelper; public class FragmentData { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentData.class); +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentData.class); private final boolean isLocal; private volatile FragmentStatus status; @@ -49,31 +48,16 @@ public class FragmentData { } /** - * Update the status for this fragment. Also records last update and last progress time. - * @param status Updated status - * @return Whether or not the status update resulted in a FragmentState change. + * Update the status for this fragment. Also records last update and last progress time. + * @param newStatus Updated status */ - public boolean setStatus(final FragmentStatus newStatus) { + public void setStatus(final FragmentStatus newStatus) { final long time = System.currentTimeMillis(); - final FragmentState oldState = status.getProfile().getState(); - final boolean inTerminalState = oldState == FragmentState.FAILED || oldState == FragmentState.FINISHED || oldState == FragmentState.CANCELLED; - final FragmentState currentState = newStatus.getProfile().getState(); - final boolean stateChanged = currentState != oldState; - - if (inTerminalState) { - // already in a terminal state. This shouldn't happen. - logger.warn(String.format("Received status message for fragment %s after fragment was in state %s. New state was %s", - QueryIdHelper.getQueryIdentifier(getHandle()), oldState, currentState)); - return false; - } - - this.lastStatusUpdate = time; + lastStatusUpdate = time; if (madeProgress(status, newStatus)) { - this.lastProgress = time; + lastProgress = time; } status = newStatus; - - return stateChanged; } public FragmentState getState() { http://git-wip-us.apache.org/repos/asf/drill/blob/d9452d97/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java index 71b77c6..a7dfe85 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java @@ -35,6 +35,7 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.SchemaUserBitShared; +import org.apache.drill.exec.proto.UserBitShared.FragmentState; import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryInfo; @@ -50,10 +51,7 @@ import org.apache.drill.exec.store.sys.PStore; import org.apache.drill.exec.store.sys.PStoreConfig; import org.apache.drill.exec.store.sys.PStoreProvider; import org.apache.drill.exec.work.EndpointListener; -import org.apache.drill.exec.work.WorkManager; import org.apache.drill.exec.work.foreman.Foreman.StateListener; -import org.apache.drill.exec.work.fragment.AbstractStatusReporter; -import org.apache.drill.exec.work.fragment.FragmentExecutor; import org.apache.drill.exec.work.fragment.NonRootStatusReporter; import org.apache.drill.exec.work.fragment.StatusReporter; @@ -126,12 +124,31 @@ public class QueryManager { } } + private static boolean isTerminal(final FragmentState state) { + return state == FragmentState.FAILED + || state == FragmentState.FINISHED + || state == FragmentState.CANCELLED; + } + private boolean updateFragmentStatus(final FragmentStatus fragmentStatus) { final FragmentHandle fragmentHandle = fragmentStatus.getHandle(); final int majorFragmentId = fragmentHandle.getMajorFragmentId(); final int minorFragmentId = fragmentHandle.getMinorFragmentId(); final FragmentData data = fragmentDataMap.get(majorFragmentId).get(minorFragmentId); - return data.setStatus(fragmentStatus); + + final FragmentState oldState = data.getState(); + final boolean inTerminalState = isTerminal(oldState); + final FragmentState currentState = fragmentStatus.getProfile().getState(); + + if (inTerminalState || (oldState == FragmentState.CANCELLATION_REQUESTED && !isTerminal(currentState))) { + // Already in a terminal state, or invalid state transition from CANCELLATION_REQUESTED. This shouldn't happen. + logger.warn(String.format("Received status message for fragment %s after fragment was in state %s. New state was %s", + QueryIdHelper.getQueryIdentifier(fragmentHandle), oldState, currentState)); + return false; + } + + data.setStatus(fragmentStatus); + return oldState != currentState; } private void fragmentDone(final FragmentStatus status) {