Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3FD8418DC1 for ; Fri, 18 Dec 2015 22:00:48 +0000 (UTC) Received: (qmail 20850 invoked by uid 500); 18 Dec 2015 22:00:48 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 20807 invoked by uid 500); 18 Dec 2015 22:00:48 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 20796 invoked by uid 99); 18 Dec 2015 22:00:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Dec 2015 22:00:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C308EE054A; Fri, 18 Dec 2015 22:00:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: prasanthj@apache.org To: commits@hive.apache.org Message-Id: <75fe56261aad4dea9c089412207815fa@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-12658: Task rejection by an llap daemon spams the log with RejectedExecutionExceptions (Prasanth Jayachandran reviewed by Siddharth Seth) Date: Fri, 18 Dec 2015 22:00:47 +0000 (UTC) Repository: hive Updated Branches: refs/heads/branch-2.0 a17c95e04 -> 1d5e9c96c HIVE-12658: Task rejection by an llap daemon spams the log with RejectedExecutionExceptions (Prasanth Jayachandran reviewed by Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1d5e9c96 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1d5e9c96 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1d5e9c96 Branch: refs/heads/branch-2.0 Commit: 1d5e9c96ced95f2c897f83937fd65cfa06bd312d Parents: a17c95e Author: Prasanth Jayachandran Authored: Fri Dec 18 13:45:02 2015 -0600 Committer: Prasanth Jayachandran Committed: Fri Dec 18 16:00:31 2015 -0600 ---------------------------------------------------------------------- .../daemon/rpc/LlapDaemonProtocolProtos.java | 233 +++++++++++++++++-- .../hive/llap/daemon/ContainerRunner.java | 13 +- .../llap/daemon/impl/ContainerRunnerImpl.java | 42 +++- .../hive/llap/daemon/impl/LlapDaemon.java | 22 +- .../impl/LlapDaemonProtocolServerImpl.java | 12 +- .../hadoop/hive/llap/daemon/impl/Scheduler.java | 11 +- .../llap/daemon/impl/TaskExecutorService.java | 28 ++- .../llap/tezplugins/LlapTaskCommunicator.java | 41 ++-- .../src/protobuf/LlapDaemonProtocol.proto | 7 + .../impl/TestLlapDaemonProtocolServerImpl.java | 19 +- .../daemon/impl/TestTaskExecutorService.java | 31 +-- 11 files changed, 355 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java ---------------------------------------------------------------------- diff --git a/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java index af009b8..d2180e5 100644 --- a/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java +++ b/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java @@ -90,6 +90,97 @@ public final class LlapDaemonProtocolProtos { // @@protoc_insertion_point(enum_scope:SourceStateProto) } + /** + * Protobuf enum {@code SubmissionStateProto} + */ + public enum SubmissionStateProto + implements com.google.protobuf.ProtocolMessageEnum { + /** + * ACCEPTED = 1; + */ + ACCEPTED(0, 1), + /** + * REJECTED = 2; + */ + REJECTED(1, 2), + /** + * EVICTED_OTHER = 3; + */ + EVICTED_OTHER(2, 3), + ; + + /** + * ACCEPTED = 1; + */ + public static final int ACCEPTED_VALUE = 1; + /** + * REJECTED = 2; + */ + public static final int REJECTED_VALUE = 2; + /** + * EVICTED_OTHER = 3; + */ + public static final int EVICTED_OTHER_VALUE = 3; + + + public final int getNumber() { return value; } + + public static SubmissionStateProto valueOf(int value) { + switch (value) { + case 1: return ACCEPTED; + case 2: return REJECTED; + case 3: return EVICTED_OTHER; + default: return null; + } + } + + public static com.google.protobuf.Internal.EnumLiteMap + internalGetValueMap() { + return internalValueMap; + } + private static com.google.protobuf.Internal.EnumLiteMap + internalValueMap = + new com.google.protobuf.Internal.EnumLiteMap() { + public SubmissionStateProto findValueByNumber(int number) { + return SubmissionStateProto.valueOf(number); + } + }; + + public final com.google.protobuf.Descriptors.EnumValueDescriptor + getValueDescriptor() { + return getDescriptor().getValues().get(index); + } + public final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptorForType() { + return getDescriptor(); + } + public static final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptor() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.getDescriptor().getEnumTypes().get(1); + } + + private static final SubmissionStateProto[] VALUES = values(); + + public static SubmissionStateProto valueOf( + com.google.protobuf.Descriptors.EnumValueDescriptor desc) { + if (desc.getType() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "EnumValueDescriptor is not for this type."); + } + return VALUES[desc.getIndex()]; + } + + private final int index; + private final int value; + + private SubmissionStateProto(int index, int value) { + this.index = index; + this.value = value; + } + + // @@protoc_insertion_point(enum_scope:SubmissionStateProto) + } + public interface UserPayloadProtoOrBuilder extends com.google.protobuf.MessageOrBuilder { @@ -8265,6 +8356,16 @@ public final class LlapDaemonProtocolProtos { public interface SubmitWorkResponseProtoOrBuilder extends com.google.protobuf.MessageOrBuilder { + + // optional .SubmissionStateProto submission_state = 1; + /** + * optional .SubmissionStateProto submission_state = 1; + */ + boolean hasSubmissionState(); + /** + * optional .SubmissionStateProto submission_state = 1; + */ + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto getSubmissionState(); } /** * Protobuf type {@code SubmitWorkResponseProto} @@ -8299,6 +8400,7 @@ public final class LlapDaemonProtocolProtos { com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { initFields(); + int mutable_bitField0_ = 0; com.google.protobuf.UnknownFieldSet.Builder unknownFields = com.google.protobuf.UnknownFieldSet.newBuilder(); try { @@ -8316,6 +8418,17 @@ public final class LlapDaemonProtocolProtos { } break; } + case 8: { + int rawValue = input.readEnum(); + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto value = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.valueOf(rawValue); + if (value == null) { + unknownFields.mergeVarintField(1, rawValue); + } else { + bitField0_ |= 0x00000001; + submissionState_ = value; + } + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -8355,7 +8468,25 @@ public final class LlapDaemonProtocolProtos { return PARSER; } + private int bitField0_; + // optional .SubmissionStateProto submission_state = 1; + public static final int SUBMISSION_STATE_FIELD_NUMBER = 1; + private org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto submissionState_; + /** + * optional .SubmissionStateProto submission_state = 1; + */ + public boolean hasSubmissionState() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional .SubmissionStateProto submission_state = 1; + */ + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto getSubmissionState() { + return submissionState_; + } + private void initFields() { + submissionState_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.ACCEPTED; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -8369,6 +8500,9 @@ public final class LlapDaemonProtocolProtos { public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io.IOException { getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeEnum(1, submissionState_.getNumber()); + } getUnknownFields().writeTo(output); } @@ -8378,6 +8512,10 @@ public final class LlapDaemonProtocolProtos { if (size != -1) return size; size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeEnumSize(1, submissionState_.getNumber()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -8401,6 +8539,11 @@ public final class LlapDaemonProtocolProtos { org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto other = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto) obj; boolean result = true; + result = result && (hasSubmissionState() == other.hasSubmissionState()); + if (hasSubmissionState()) { + result = result && + (getSubmissionState() == other.getSubmissionState()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -8414,6 +8557,10 @@ public final class LlapDaemonProtocolProtos { } int hash = 41; hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasSubmissionState()) { + hash = (37 * hash) + SUBMISSION_STATE_FIELD_NUMBER; + hash = (53 * hash) + hashEnum(getSubmissionState()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -8523,6 +8670,8 @@ public final class LlapDaemonProtocolProtos { public Builder clear() { super.clear(); + submissionState_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.ACCEPTED; + bitField0_ = (bitField0_ & ~0x00000001); return this; } @@ -8549,6 +8698,13 @@ public final class LlapDaemonProtocolProtos { public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto buildPartial() { org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto result = new org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.submissionState_ = submissionState_; + result.bitField0_ = to_bitField0_; onBuilt(); return result; } @@ -8564,6 +8720,9 @@ public final class LlapDaemonProtocolProtos { public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto other) { if (other == org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto.getDefaultInstance()) return this; + if (other.hasSubmissionState()) { + setSubmissionState(other.getSubmissionState()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -8589,6 +8748,43 @@ public final class LlapDaemonProtocolProtos { } return this; } + private int bitField0_; + + // optional .SubmissionStateProto submission_state = 1; + private org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto submissionState_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.ACCEPTED; + /** + * optional .SubmissionStateProto submission_state = 1; + */ + public boolean hasSubmissionState() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional .SubmissionStateProto submission_state = 1; + */ + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto getSubmissionState() { + return submissionState_; + } + /** + * optional .SubmissionStateProto submission_state = 1; + */ + public Builder setSubmissionState(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + submissionState_ = value; + onChanged(); + return this; + } + /** + * optional .SubmissionStateProto submission_state = 1; + */ + public Builder clearSubmissionState() { + bitField0_ = (bitField0_ & ~0x00000001); + submissionState_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.ACCEPTED; + onChanged(); + return this; + } // @@protoc_insertion_point(builder_scope:SubmitWorkResponseProto) } @@ -13565,30 +13761,33 @@ public final class LlapDaemonProtocolProtos { "ing\030\007 \001(\t\022\032\n\022app_attempt_number\030\010 \001(\005\022)\n" + "\rfragment_spec\030\t \001(\0132\022.FragmentSpecProto" + "\0223\n\025fragment_runtime_info\030\n \001(\0132\024.Fragme" + - "ntRuntimeInfo\"\031\n\027SubmitWorkResponseProto" + - "\"f\n\036SourceStateUpdatedRequestProto\022\020\n\010da" + - "g_name\030\001 \001(\t\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030" + - "\003 \001(\0162\021.SourceStateProto\"!\n\037SourceStateU" + - "pdatedResponseProto\"X\n\031QueryCompleteRequ" + - "estProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_name\030\002" + - " \001(\t\022\027\n\014delete_delay\030\003 \001(\003:\0010\"\034\n\032QueryCo", - "mpleteResponseProto\"g\n\035TerminateFragment" + - "RequestProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_na" + - "me\030\002 \001(\t\022\"\n\032fragment_identifier_string\030\007" + - " \001(\t\" \n\036TerminateFragmentResponseProto\"\026" + - "\n\024GetTokenRequestProto\"&\n\025GetTokenRespon" + - "seProto\022\r\n\005token\030\001 \001(\014*2\n\020SourceStatePro" + - "to\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\0022\316\002\n\022L" + + "ntRuntimeInfo\"J\n\027SubmitWorkResponseProto" + + "\022/\n\020submission_state\030\001 \001(\0162\025.SubmissionS" + + "tateProto\"f\n\036SourceStateUpdatedRequestPr" + + "oto\022\020\n\010dag_name\030\001 \001(\t\022\020\n\010src_name\030\002 \001(\t\022" + + " \n\005state\030\003 \001(\0162\021.SourceStateProto\"!\n\037Sou" + + "rceStateUpdatedResponseProto\"X\n\031QueryCom" + + "pleteRequestProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010d", + "ag_name\030\002 \001(\t\022\027\n\014delete_delay\030\003 \001(\003:\0010\"\034" + + "\n\032QueryCompleteResponseProto\"g\n\035Terminat" + + "eFragmentRequestProto\022\020\n\010query_id\030\001 \001(\t\022" + + "\020\n\010dag_name\030\002 \001(\t\022\"\n\032fragment_identifier" + + "_string\030\007 \001(\t\" \n\036TerminateFragmentRespon" + + "seProto\"\026\n\024GetTokenRequestProto\"&\n\025GetTo" + + "kenResponseProto\022\r\n\005token\030\001 \001(\014*2\n\020Sourc" + + "eStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNIN" + + "G\020\002*E\n\024SubmissionStateProto\022\014\n\010ACCEPTED\020" + + "\001\022\014\n\010REJECTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n\022L", "lapDaemonProtocol\022?\n\nsubmitWork\022\027.Submit" + "WorkRequestProto\032\030.SubmitWorkResponsePro" + - "to\022W\n\022sourceStateUpdated\022\037.SourceStateUp", + "to\022W\n\022sourceStateUpdated\022\037.SourceStateUp" + "datedRequestProto\032 .SourceStateUpdatedRe" + "sponseProto\022H\n\rqueryComplete\022\032.QueryComp" + "leteRequestProto\032\033.QueryCompleteResponse" + "Proto\022T\n\021terminateFragment\022\036.TerminateFr" + "agmentRequestProto\032\037.TerminateFragmentRe" + "sponseProto2]\n\026LlapManagementProtocol\022C\n" + - "\022getDelegationToken\022\025.GetTokenRequestPro" + + "\022getDelegationToken\022\025.GetTokenRequestPro", "to\032\026.GetTokenResponseProtoBH\n&org.apache" + ".hadoop.hive.llap.daemon.rpcB\030LlapDaemon" + "ProtocolProtos\210\001\001\240\001\001" @@ -13645,7 +13844,7 @@ public final class LlapDaemonProtocolProtos { internal_static_SubmitWorkResponseProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_SubmitWorkResponseProto_descriptor, - new java.lang.String[] { }); + new java.lang.String[] { "SubmissionState", }); internal_static_SourceStateUpdatedRequestProto_descriptor = getDescriptor().getMessageTypes().get(8); internal_static_SourceStateUpdatedRequestProto_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java index f3ce33b..fc29371 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java @@ -16,19 +16,22 @@ package org.apache.hadoop.hive.llap.daemon; import java.io.IOException; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; public interface ContainerRunner { - void submitWork(SubmitWorkRequestProto request) throws IOException; + SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException; - void sourceStateUpdated(SourceStateUpdatedRequestProto request); + SourceStateUpdatedResponseProto sourceStateUpdated(SourceStateUpdatedRequestProto request); - void queryComplete(QueryCompleteRequestProto request); + QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request); - void terminateFragment(TerminateFragmentRequestProto request); + TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request); } http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 2139bb0..0d85671 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -22,13 +22,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; @@ -39,9 +37,14 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentS import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor; @@ -62,7 +65,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; - // TODO Convert this to a CompositeService public class ContainerRunnerImpl extends CompositeService implements ContainerRunner, FragmentCompletionHandler, QueryFailedHandler { @@ -145,7 +147,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu } @Override - public void submitWork(SubmitWorkRequestProto request) throws IOException { + public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException { HistoryLogger.logFragmentStart(request.getApplicationIdString(), request.getContainerIdString(), localAddress.get().getHostName(), request.getFragmentSpec().getDagName(), request.getFragmentSpec().getVertexName(), request.getFragmentSpec().getFragmentNumber(), @@ -157,6 +159,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu // TODO Reduce the length of this string. Way too verbose at the moment. String ndcContextString = request.getFragmentSpec().getFragmentIdentifierString(); NDC.push(ndcContextString); + Scheduler.SubmissionState submissionState; + SubmitWorkResponseProto.Builder responseBuilder = SubmitWorkResponseProto.newBuilder(); try { Map env = new HashMap<>(); // TODO What else is required in this environment map. @@ -191,7 +195,9 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu Token jobToken = TokenCache.getSessionToken(credentials); - LOG.debug("Registering request with the ShuffleHandler"); + if (LOG.isDebugEnabled()) { + LOG.debug("Registering request with the ShuffleHandler"); + } ShuffleHandler.get() .registerDag(request.getApplicationIdString(), dagIdentifier, jobToken, request.getUser(), localDirs); @@ -200,18 +206,27 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu new LlapExecutionContext(localAddress.get().getHostName(), queryTracker), env, credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler, this); - try { - executorService.schedule(callable); - } catch (RejectedExecutionException e) { + submissionState = executorService.schedule(callable); + + if (LOG.isInfoEnabled()) { + LOG.info("SubmissionState for {} : {} ", ndcContextString, submissionState); + } + + if (submissionState.equals(Scheduler.SubmissionState.REJECTED)) { // Stop tracking the fragment and re-throw the error. fragmentComplete(fragmentInfo); - throw e; + return responseBuilder + .setSubmissionState(SubmissionStateProto.valueOf(submissionState.name())) + .build(); } metrics.incrExecutorTotalRequestsHandled(); metrics.incrExecutorNumQueuedRequests(); } finally { NDC.pop(); } + + responseBuilder.setSubmissionState(SubmissionStateProto.valueOf(submissionState.name())); + return responseBuilder.build(); } private static class LlapExecutionContext extends ExecutionContextImpl @@ -230,14 +245,15 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu } @Override - public void sourceStateUpdated(SourceStateUpdatedRequestProto request) { + public SourceStateUpdatedResponseProto sourceStateUpdated(SourceStateUpdatedRequestProto request) { LOG.info("Processing state update: " + stringifySourceStateUpdateRequest(request)); queryTracker.registerSourceStateChange(request.getDagName(), request.getSrcName(), request.getState()); + return SourceStateUpdatedResponseProto.getDefaultInstance(); } @Override - public void queryComplete(QueryCompleteRequestProto request) { + public QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request) { LOG.info("Processing queryComplete notification for {}", request.getDagName()); List knownFragments = queryTracker.queryComplete(null, request.getDagName(), request.getDeleteDelay()); @@ -248,12 +264,14 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu fragmentInfo.getFragmentIdentifierString()); executorService.killFragment(fragmentInfo.getFragmentIdentifierString()); } + return QueryCompleteResponseProto.getDefaultInstance(); } @Override - public void terminateFragment(TerminateFragmentRequestProto request) { + public TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request) { LOG.info("DBG: Received terminateFragment request for {}", request.getFragmentIdentifierString()); executorService.killFragment(request.getFragmentIdentifierString()); + return TerminateFragmentResponseProto.getDefaultInstance(); } private String stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto request) { http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index e1ecf64..467ab71 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -32,16 +32,20 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler; -import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; import org.apache.hadoop.hive.llap.daemon.services.impl.LlapWebServices; import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; import org.apache.hadoop.hive.llap.metrics.MetricsUtils; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.service.CompositeService; @@ -313,25 +317,25 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla } @Override - public void submitWork(SubmitWorkRequestProto request) throws + public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException { numSubmissions.incrementAndGet(); - containerRunner.submitWork(request); + return containerRunner.submitWork(request); } @Override - public void sourceStateUpdated(SourceStateUpdatedRequestProto request) { - containerRunner.sourceStateUpdated(request); + public SourceStateUpdatedResponseProto sourceStateUpdated(SourceStateUpdatedRequestProto request) { + return containerRunner.sourceStateUpdated(request); } @Override - public void queryComplete(QueryCompleteRequestProto request) { - containerRunner.queryComplete(request); + public QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request) { + return containerRunner.queryComplete(request); } @Override - public void terminateFragment(TerminateFragmentRequestProto request) { - containerRunner.terminateFragment(request); + public TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request) { + return containerRunner.terminateFragment(request); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java index db0b752..f87fffe 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java @@ -93,33 +93,29 @@ public class LlapDaemonProtocolServerImpl extends AbstractService SubmitWorkRequestProto request) throws ServiceException { try { - containerRunner.submitWork(request); + return containerRunner.submitWork(request); } catch (IOException e) { throw new ServiceException(e); } - return SubmitWorkResponseProto.getDefaultInstance(); } @Override public SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller, SourceStateUpdatedRequestProto request) throws ServiceException { - containerRunner.sourceStateUpdated(request); - return SourceStateUpdatedResponseProto.getDefaultInstance(); + return containerRunner.sourceStateUpdated(request); } @Override public QueryCompleteResponseProto queryComplete(RpcController controller, QueryCompleteRequestProto request) throws ServiceException { - containerRunner.queryComplete(request); - return QueryCompleteResponseProto.getDefaultInstance(); + return containerRunner.queryComplete(request); } @Override public TerminateFragmentResponseProto terminateFragment( RpcController controller, TerminateFragmentRequestProto request) throws ServiceException { - containerRunner.terminateFragment(request); - return TerminateFragmentResponseProto.getDefaultInstance(); + return containerRunner.terminateFragment(request); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java index 1d35b10..26c8e55 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java @@ -18,19 +18,24 @@ package org.apache.hadoop.hive.llap.daemon.impl; import java.util.Set; -import java.util.concurrent.RejectedExecutionException; /** * Task scheduler interface */ public interface Scheduler { + enum SubmissionState { + ACCEPTED, // request accepted + REJECTED, // request rejected as wait queue is full + EVICTED_OTHER; // request accepted but evicted other low priority task + } + /** * Schedule the task or throw RejectedExecutionException if queues are full * @param t - task to schedule - * @throws RejectedExecutionException + * @return SubmissionState */ - void schedule(T t) throws RejectedExecutionException; + SubmissionState schedule(T t); /** * Attempt to kill the fragment with the specified fragmentId http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 5e2c6dd..34aa5c9 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -70,7 +70,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * run to completion immediately (canFinish = false) are added to pre-emption queue. *

* When all the executor threads are occupied and wait queue is full, the task scheduler will - * throw RejectedExecutionException. + * return SubmissionState.REJECTED response *

* Task executor service can be shut down which will terminated all running tasks and reject all * new tasks. Shutting down of the task executor service can be done gracefully or immediately. @@ -316,9 +316,9 @@ public class TaskExecutorService extends AbstractService implements Scheduler() { @Override public void setResponse(SubmitWorkResponseProto response) { + if (response.hasSubmissionState()) { + LlapDaemonProtocolProtos.SubmissionStateProto ss = response.getSubmissionState(); + if (ss.equals(LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) { + LOG.info( + "Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + + containerId + ", Service Busy"); + getContext().taskKilled(taskSpec.getTaskAttemptID(), + TaskAttemptEndReason.EXECUTOR_BUSY, "Service Busy"); + return; + } + } else { + // TODO: Provide support for reporting errors + // This should never happen as server always returns a valid status on success + throw new RuntimeException("SubmissionState in response is expected!"); + } LOG.info("Successfully launched task: " + taskSpec.getTaskAttemptID()); } @@ -270,23 +283,13 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { } if (t instanceof RemoteException) { RemoteException re = (RemoteException) t; - String message = re.toString(); - // RejectedExecutions from the remote service treated as KILLED - if (message.contains(RejectedExecutionException.class.getName())) { - LOG.info( - "Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + - containerId + ", Service Busy"); - getContext().taskKilled(taskSpec.getTaskAttemptID(), - TaskAttemptEndReason.EXECUTOR_BUSY, "Service Busy"); - } else { - // All others from the remote service cause the task to FAIL. - LOG.info( - "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + - containerId, t); - getContext() - .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, - t.toString()); - } + // All others from the remote service cause the task to FAIL. + LOG.info( + "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + + containerId, t); + getContext() + .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, + t.toString()); } else { // Exception from the RPC layer - communication failure, consider as KILLED / service down. if (t instanceof IOException) { http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/protobuf/LlapDaemonProtocol.proto ---------------------------------------------------------------------- diff --git a/llap-server/src/protobuf/LlapDaemonProtocol.proto b/llap-server/src/protobuf/LlapDaemonProtocol.proto index 07721df..a2d944f 100644 --- a/llap-server/src/protobuf/LlapDaemonProtocol.proto +++ b/llap-server/src/protobuf/LlapDaemonProtocol.proto @@ -87,7 +87,14 @@ message SubmitWorkRequestProto { optional FragmentRuntimeInfo fragment_runtime_info = 10; } +enum SubmissionStateProto { + ACCEPTED = 1; + REJECTED = 2; + EVICTED_OTHER = 3; +} + message SubmitWorkResponseProto { + optional SubmissionStateProto submission_state = 1; } message SourceStateUpdatedRequestProto { http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java index 0006a9a..44c958d 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java @@ -14,8 +14,10 @@ package org.apache.hadoop.hive.llap.daemon.impl; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.*; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicReference; @@ -27,21 +29,28 @@ import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto; import org.junit.Test; public class TestLlapDaemonProtocolServerImpl { @Test(timeout = 10000) - public void test() throws ServiceException { + public void test() throws ServiceException, IOException { LlapConfiguration daemonConf = new LlapConfiguration(); int rpcPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_PORT); int numHandlers = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_NUM_HANDLERS); + ContainerRunner containerRunnerMock = mock(ContainerRunner.class); LlapDaemonProtocolServerImpl server = - new LlapDaemonProtocolServerImpl(numHandlers, mock(ContainerRunner.class), + new LlapDaemonProtocolServerImpl(numHandlers, containerRunnerMock, new AtomicReference(), new AtomicReference(), rpcPort, rpcPort + 1); - + when(containerRunnerMock.submitWork(any(SubmitWorkRequestProto.class))).thenReturn( + SubmitWorkResponseProto + .newBuilder() + .setSubmissionState(SubmissionStateProto.ACCEPTED) + .build()); try { server.init(new Configuration()); server.start(); @@ -50,10 +59,12 @@ public class TestLlapDaemonProtocolServerImpl { LlapDaemonProtocolBlockingPB client = new LlapDaemonProtocolClientImpl(new Configuration(), serverAddr.getHostName(), serverAddr.getPort(), null, null); - client.submitWork(null, + SubmitWorkResponseProto responseProto = client.submitWork(null, SubmitWorkRequestProto.newBuilder() .setAmHost("amhost") .setAmPort(2000).build()); + assertEquals(responseProto.getSubmissionState().name(), + SubmissionStateProto.ACCEPTED.name()); } finally { server.stop(); http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index cb2d0e9..5491064 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -22,7 +22,6 @@ import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.cr import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.util.HashMap; import java.util.Map; @@ -30,7 +29,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -124,28 +122,17 @@ public class TestTaskExecutorService { // TODO HIVE-11687. Remove the awaitStart once offer can handle (waitQueueSize + numFreeExecutionSlots) // This currently serves to allow the task to be removed from the waitQueue. r1.awaitStart(); - try { - taskExecutorService.schedule(r2); - } catch (RejectedExecutionException e) { - fail("Unexpected rejection with space available in queue"); - } - try { - taskExecutorService.schedule(r3); - } catch (RejectedExecutionException e) { - fail("Unexpected rejection with space available in queue"); - } + Scheduler.SubmissionState submissionState = taskExecutorService.schedule(r2); + assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState); - try { - taskExecutorService.schedule(r4); - fail("Expecting a Rejection for non finishable task with a full queue"); - } catch (RejectedExecutionException e) { - } + submissionState = taskExecutorService.schedule(r3); + assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState); - try { - taskExecutorService.schedule(r5); - } catch (RejectedExecutionException e) { - fail("Unexpected rejection for a finishable task"); - } + submissionState = taskExecutorService.schedule(r4); + assertEquals(Scheduler.SubmissionState.REJECTED, submissionState); + + submissionState = taskExecutorService.schedule(r5); + assertEquals(Scheduler.SubmissionState.EVICTED_OTHER, submissionState); // Ensure the correct task was preempted. assertEquals(true, r3.wasPreempted());