Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C3E4D200ACA for ; Thu, 9 Jun 2016 20:57:08 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C2729160A58; Thu, 9 Jun 2016 18:57:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9C103160A29 for ; Thu, 9 Jun 2016 20:57:06 +0200 (CEST) Received: (qmail 82614 invoked by uid 500); 9 Jun 2016 18:57:05 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 82591 invoked by uid 99); 9 Jun 2016 18:57:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Jun 2016 18:57:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 86854DFF32; Thu, 9 Jun 2016 18:57:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sershe@apache.org To: commits@hive.apache.org Date: Thu, 09 Jun 2016 18:57:05 -0000 Message-Id: <0e60712f15014be3b4ba81eb82f78383@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] hive git commit: HIVE-13675 : LLAP: add HMAC signatures to LLAPIF splits (Sergey Shelukhin, reviewed by Siddharth Seth) archived-at: Thu, 09 Jun 2016 18:57:09 -0000 Repository: hive Updated Branches: refs/heads/master f67c862e3 -> b895d9d60 HIVE-13675 : LLAP: add HMAC signatures to LLAPIF splits (Sergey Shelukhin, reviewed by Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6a59cfd5 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6a59cfd5 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6a59cfd5 Branch: refs/heads/master Commit: 6a59cfd5e1e724697279b285eebed851c49ecdaa Parents: f67c862 Author: Sergey Shelukhin Authored: Thu Jun 9 11:47:41 2016 -0700 Committer: Sergey Shelukhin Committed: Thu Jun 9 11:47:41 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 5 + .../apache/hadoop/hive/llap/SubmitWorkInfo.java | 54 +++++-- .../hive/llap/coordinator/LlapCoordinator.java | 159 +++++++++++++++++++ .../ext/LlapTaskUmbilicalExternalClient.java | 26 +-- .../hadoop/hive/llap/io/api/LlapProxy.java | 20 +-- .../llap/security/LlapTokenLocalClient.java | 59 ------- .../llap/security/LlapTokenLocalClientImpl.java | 61 +++++++ .../hadoop/hive/llap/security/LlapSigner.java | 4 +- .../hive/llap/security/LlapSignerImpl.java | 73 +++++++++ .../llap/security/LlapTokenLocalClient.java | 30 ++++ .../hive/llap/security/SecretManager.java | 6 + .../llap/security/SigningSecretManager.java | 1 + .../apache/hadoop/hive/llap/tez/Converters.java | 9 +- .../hadoop/hive/llap/tez/TestConverters.java | 2 +- .../hadoop/hive/llap/LlapBaseInputFormat.java | 90 +++++------ .../llap/daemon/impl/ContainerRunnerImpl.java | 2 +- .../hive/llap/security/LlapSignerImpl.java | 60 ------- .../hive/llap/security/TestLlapSignerImpl.java | 7 +- .../llap/tezplugins/LlapTaskCommunicator.java | 2 +- ql/pom.xml | 1 + .../hive/ql/exec/tez/TezSessionState.java | 43 ++--- .../ql/udf/generic/GenericUDTFGetSplits.java | 78 +++++++-- .../apache/hive/service/server/HiveServer2.java | 10 ++ 23 files changed, 540 insertions(+), 262 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index fe69ffa..285caa3 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2888,6 +2888,11 @@ public class HiveConf extends Configuration { LLAP_ENABLE_GRACE_JOIN_IN_LLAP("hive.llap.enable.grace.join.in.llap", false, "Override if grace join should be allowed to run in llap."), + LLAP_HS2_ENABLE_COORDINATOR("hive.llap.hs2.coordinator.enabled", true, + "Whether to create the LLAP coordinator; since execution engine and container vs llap\n" + + "settings are both coming from job configs, we don't know at start whether this should\n" + + "be created. Default true."), + SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", "60s", new TimeValidator(TimeUnit.SECONDS), "Timeout for requests from Hive client to remote Spark driver."), http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java index 6704294..95b0ffc 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java @@ -28,33 +28,32 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; -import org.apache.tez.runtime.api.impl.TaskSpec; public class SubmitWorkInfo implements Writable { - private TaskSpec taskSpec; private ApplicationId fakeAppId; private long creationTime; + private byte[] vertexSpec, vertexSpecSignature; // This is used to communicate over the LlapUmbilicalProtocol. Not related to tokens used to // talk to LLAP daemons itself via the securit work. private Token token; + private int vertexParallelism; - public SubmitWorkInfo(TaskSpec taskSpec, ApplicationId fakeAppId, long creationTime) { - this.taskSpec = taskSpec; + public SubmitWorkInfo(ApplicationId fakeAppId, long creationTime, + int vertexParallelism, byte[] vertexSpec, byte[] vertexSpecSignature) { this.fakeAppId = fakeAppId; this.token = createJobToken(); this.creationTime = creationTime; + this.vertexSpec = vertexSpec; + this.vertexSpecSignature = vertexSpecSignature; + this.vertexParallelism = vertexParallelism; } // Empty constructor for writable etc. public SubmitWorkInfo() { } - public TaskSpec getTaskSpec() { - return taskSpec; - } - public ApplicationId getFakeAppId() { return fakeAppId; } @@ -73,23 +72,44 @@ public class SubmitWorkInfo implements Writable { @Override public void write(DataOutput out) throws IOException { - taskSpec.write(out); out.writeLong(fakeAppId.getClusterTimestamp()); out.writeInt(fakeAppId.getId()); token.write(out); out.writeLong(creationTime); + out.writeInt(vertexParallelism); + if (vertexSpec != null) { + out.writeInt(vertexSpec.length); + out.write(vertexSpec); + } else { + out.writeInt(0); + } + if (vertexSpecSignature != null) { + out.writeInt(vertexSpecSignature.length); + out.write(vertexSpecSignature); + } else { + out.writeInt(0); + } } @Override public void readFields(DataInput in) throws IOException { - taskSpec = new TaskSpec(); - taskSpec.readFields(in); long appIdTs = in.readLong(); int appIdId = in.readInt(); fakeAppId = ApplicationId.newInstance(appIdTs, appIdId); token = new Token<>(); token.readFields(in); creationTime = in.readLong(); + vertexParallelism = in.readInt(); + int vertexSpecBytes = in.readInt(); + if (vertexSpecBytes > 0) { + vertexSpec = new byte[vertexSpecBytes]; + in.readFully(vertexSpec); + } + int vertexSpecSignBytes = in.readInt(); + if (vertexSpecSignBytes > 0) { + vertexSpecSignature = new byte[vertexSpecSignBytes]; + in.readFully(vertexSpecSignature); + } } public static byte[] toBytes(SubmitWorkInfo submitWorkInfo) throws IOException { @@ -116,4 +136,16 @@ public class SubmitWorkInfo implements Writable { sessionToken.setService(identifier.getJobId()); return sessionToken; } + + public byte[] getVertexBinary() { + return vertexSpec; + } + + public byte[] getVertexSignature() { + return vertexSpecSignature; + } + + public int getVertexParallelism() { + return vertexParallelism; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-client/src/java/org/apache/hadoop/hive/llap/coordinator/LlapCoordinator.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/coordinator/LlapCoordinator.java b/llap-client/src/java/org/apache/hadoop/hive/llap/coordinator/LlapCoordinator.java new file mode 100644 index 0000000..f55779b --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/coordinator/LlapCoordinator.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.coordinator; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.DaemonId; +import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator; +import org.apache.hadoop.hive.llap.security.LlapSigner; +import org.apache.hadoop.hive.llap.security.LlapSignerImpl; +import org.apache.hadoop.hive.llap.security.LlapTokenLocalClient; +import org.apache.hadoop.hive.llap.security.LlapTokenLocalClientImpl; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; + +/** + * The class containing facilities for LLAP interactions in HS2. + * This may eventually evolve into a central LLAP manager hosted by HS2 or elsewhere. + * Refactor as needed. + */ +public class LlapCoordinator { + private static final Logger LOG = LoggerFactory.getLogger(LlapCoordinator.class); + + /** We'll keep signers per cluster around for some time, for reuse. */ + private final Cache signers = CacheBuilder.newBuilder().removalListener( + new RemovalListener() { + @Override + public void onRemoval(RemovalNotification notification) { + if (notification.getValue() != null) { + notification.getValue().close(); + } + } + }).expireAfterAccess(10, TimeUnit.MINUTES).build(); + + // TODO: probably temporary before HIVE-13698; after that we may create one per session. + private static final Cache localClientCache = CacheBuilder + .newBuilder().expireAfterAccess(10, TimeUnit.MINUTES) + .removalListener(new RemovalListener() { + @Override + public void onRemoval(RemovalNotification notification) { + if (notification.getValue() != null) { + notification.getValue().close(); + } + } + }).build(); + + private HiveConf hiveConf; + private String clusterUser; + + LlapCoordinator() { + } + + private void init(HiveConf hiveConf) throws IOException { + // Only do the lightweight stuff in ctor; by default, LLAP coordinator is created during + // HS2 init without the knowledge of LLAP usage (or lack thereof) in the cluster. + this.hiveConf = hiveConf; + this.clusterUser = UserGroupInformation.getCurrentUser().getShortUserName(); + } + + public LlapSigner getLlapSigner(final Configuration jobConf) { + // Note that we create the cluster name from user conf (hence, a user can target a cluster), + // but then we create the signer using hiveConf (hence, we control the ZK config and stuff). + assert UserGroupInformation.isSecurityEnabled(); + final String clusterId = DaemonId.createClusterString( + clusterUser, LlapUtil.generateClusterName(jobConf)); + try { + return signers.get(clusterId, new Callable() { + public LlapSigner call() throws Exception { + return new LlapSignerImpl(hiveConf, clusterId); + } + }); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + public ApplicationId createExtClientAppId() { + // TODO: moved from UDTF; need JIRA to generate this properly (no dups, etc.)... + return ApplicationId.newInstance(Math.abs(new Random().nextInt()), 0); + // Note that we cannot allow users to provide app ID, since providing somebody else's appId + // would give one LLAP token (and splits) for that app ID. If we could verify it somehow + // (YARN token? nothing we can do in an UDF), we could get it from client already running on + // YARN. As such, the clients running on YARN will have two app IDs to be aware of. + // TODO: Perhaps they can give us their app id as an argument to the UDF, and we'd just append + // a unique string here, for easier tracking? + } + + public LlapTokenLocalClient getLocalTokenClient( + final Configuration conf, String clusterUser) throws IOException { + // Note that we create the cluster name from user conf (hence, a user can target a cluster), + // but then we create the signer using hiveConf (hence, we control the ZK config and stuff). + assert UserGroupInformation.isSecurityEnabled(); + String clusterName = LlapUtil.generateClusterName(conf); + // This assumes that the LLAP cluster and session are both running under HS2 user. + final String clusterId = DaemonId.createClusterString(clusterUser, clusterName); + try { + return localClientCache.get(clusterId, new Callable() { + @Override + public LlapTokenLocalClientImpl call() throws Exception { + return new LlapTokenLocalClientImpl(hiveConf, clusterId); + } + }); + } catch (ExecutionException e) { + throw new IOException(e); + } + } + + public void close() { + try { + localClientCache.invalidateAll(); + signers.invalidateAll(); + localClientCache.cleanUp(); + signers.cleanUp(); + } catch (Exception ex) { + LOG.error("Error closing the coordinator; ignoring", ex); + } + } + + /** TODO: ideally, when the splits UDF is made a proper API, coordinator should not + * be managed as a global. HS2 should create it and then pass it around. */ + private static final LlapCoordinator INSTANCE = new LlapCoordinator(); + public static void initializeInstance(HiveConf hiveConf) throws IOException { + INSTANCE.init(hiveConf); + } + + public static LlapCoordinator getInstance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java index 0edb1cd..e21481e 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java @@ -27,7 +27,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import com.google.common.base.Preconditions; +import com.google.protobuf.InvalidProtocolBufferException; + import org.apache.commons.collections4.ListUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -35,6 +36,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexIdentifier; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy; @@ -43,12 +45,9 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.impl.EventType; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; @@ -139,14 +138,23 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService { /** * Submit the work for actual execution. - * @param submitWorkRequestProto + * @throws InvalidProtocolBufferException */ - public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort, List tezEvents) { + public void submitWork( + SubmitWorkRequestProto request, String llapHost, int llapPort, List tezEvents) { // Register the pending events to be sent for this spec. - SignableVertexSpec vertex = submitWorkRequestProto.getWorkSpec().getVertex(); + VertexOrBinary vob = request.getWorkSpec(); + assert vob.hasVertexBinary() != vob.hasVertex(); + SignableVertexSpec vertex = null; + try { + vertex = vob.hasVertex() ? vob.getVertex() + : SignableVertexSpec.parseFrom(vob.getVertexBinary()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } VertexIdentifier vId = vertex.getVertexIdentifier(); TezTaskAttemptID attemptId = Converters.createTaskAttemptId( - vId, submitWorkRequestProto.getFragmentNumber(), submitWorkRequestProto.getAttemptNumber()); + vId, request.getFragmentNumber(), request.getAttemptNumber()); final String fragmentId = attemptId.toString(); PendingEventData pendingEventData = new PendingEventData( @@ -159,7 +167,7 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService { connectionTimeout, connectionTimeout, TimeUnit.MILLISECONDS); // Send out the actual SubmitWorkRequest - communicator.sendSubmitWork(submitWorkRequestProto, llapHost, llapPort, + communicator.sendSubmitWork(request, llapHost, llapPort, new LlapProtocolClientProxy.ExecuteRequestCallback() { @Override http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java index 6c2618b..8f1b59b 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java @@ -17,14 +17,13 @@ */ package org.apache.hadoop.hive.llap.io.api; -import java.io.IOException; import java.lang.reflect.Constructor; - import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator; @SuppressWarnings("rawtypes") public class LlapProxy { - private final static String IMPL_CLASS = "org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl"; + private final static String IO_IMPL_CLASS = "org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl"; // Llap server depends on Hive execution, so the reverse cannot be true. We create the I/O // singleton once (on daemon startup); the said singleton serves as the IO interface. @@ -48,23 +47,18 @@ public class LlapProxy { if (io != null) { return; // already initialized } - - try { - io = createIoImpl(conf); - } catch (IOException e) { - throw new RuntimeException("Cannot initialize local server", e); - } + io = createInstance(IO_IMPL_CLASS, conf); } - private static LlapIo createIoImpl(Configuration conf) throws IOException { + private static T createInstance(String className, Configuration conf) { try { @SuppressWarnings("unchecked") - Class clazz = (Class)Class.forName(IMPL_CLASS); - Constructor ctor = clazz.getDeclaredConstructor(Configuration.class); + Class clazz = (Class)Class.forName(className); + Constructor ctor = clazz.getDeclaredConstructor(Configuration.class); ctor.setAccessible(true); return ctor.newInstance(conf); } catch (Exception e) { - throw new RuntimeException("Failed to create impl class", e); + throw new RuntimeException("Failed to create " + className, e); } } http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java deleted file mode 100644 index af889b6..0000000 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.security; - -import java.io.IOException; - - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.token.Token; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class LlapTokenLocalClient { - private static final Logger LOG = LoggerFactory.getLogger(LlapTokenLocalClient.class); - private final SecretManager secretManager; - - public LlapTokenLocalClient(Configuration conf, String clusterId) { - // TODO: create this centrally in HS2 case - secretManager = SecretManager.createSecretManager(conf, clusterId); - } - - public Token createToken( - String appId, String user, boolean isSignatureRequired) throws IOException { - try { - Token token = secretManager.createLlapToken( - appId, user, isSignatureRequired); - if (LOG.isInfoEnabled()) { - LOG.info("Created a LLAP delegation token locally: " + token); - } - return token; - } catch (Exception ex) { - throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex); - } - } - - public void close() { - try { - secretManager.stopThreads(); - } catch (Exception ex) { - // Ignore. - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClientImpl.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClientImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClientImpl.java new file mode 100644 index 0000000..78510b0 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClientImpl.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.security; + +import java.io.IOException; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LlapTokenLocalClientImpl implements LlapTokenLocalClient { + private static final Logger LOG = LoggerFactory.getLogger(LlapTokenLocalClientImpl.class); + private final SecretManager secretManager; + + public LlapTokenLocalClientImpl(Configuration conf, String clusterId) { + // TODO: create this centrally in HS2 case + secretManager = SecretManager.createSecretManager(conf, clusterId); + } + + @Override + public Token createToken( + String appId, String user, boolean isSignatureRequired) throws IOException { + try { + Token token = secretManager.createLlapToken( + appId, user, isSignatureRequired); + if (LOG.isInfoEnabled()) { + LOG.info("Created a LLAP delegation token locally: " + token); + } + return token; + } catch (Exception ex) { + throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex); + } + } + + @Override + public void close() { + try { + secretManager.stopThreads(); + } catch (Exception ex) { + // Ignore. + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSigner.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSigner.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSigner.java index 478a40a..7b9c8cc 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSigner.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSigner.java @@ -24,7 +24,7 @@ public interface LlapSigner { /** An object signable by a signer. */ public interface Signable { /** Called by the signer to record key information as part of the message to be signed. */ - void setSignInfo(int masterKeyId, String user); + void setSignInfo(int masterKeyId); /** Called by the signer to get the serialized representation of the message to be signed. */ byte[] serialize() throws IOException; } @@ -38,4 +38,6 @@ public interface LlapSigner { SignedMessage serializeAndSign(Signable message) throws IOException; void checkSignature(byte[] message, byte[] signature, int keyId); + + void close(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java new file mode 100644 index 0000000..7f10505 --- /dev/null +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.security; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +public class LlapSignerImpl implements LlapSigner { + private static final Logger LOG = LoggerFactory.getLogger(LlapSignerImpl.class); + + private final SigningSecretManager secretManager; + + public LlapSignerImpl(Configuration conf, String clusterId) { + // TODO: create this centrally in HS2 case + assert UserGroupInformation.isSecurityEnabled(); + secretManager = SecretManager.createSecretManager(conf, clusterId); + } + + @VisibleForTesting + public LlapSignerImpl(SigningSecretManager sm) { + secretManager = sm; + } + + @Override + public SignedMessage serializeAndSign(Signable message) throws IOException { + SignedMessage result = new SignedMessage(); + DelegationKey key = secretManager.getCurrentKey(); + message.setSignInfo(key.getKeyId()); + result.message = message.serialize(); + result.signature = secretManager.signWithKey(result.message, key); + return result; + } + + @Override + public void checkSignature(byte[] message, byte[] signature, int keyId) + throws SecurityException { + byte[] expectedSignature = secretManager.signWithKey(message, keyId); + if (Arrays.equals(signature, expectedSignature)) return; + throw new SecurityException("Message signature does not match"); + } + + @Override + public void close() { + try { + secretManager.close(); + } catch (Exception ex) { + LOG.error("Error closing the signer", ex); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java new file mode 100644 index 0000000..fd09652 --- /dev/null +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.security; + +import java.io.IOException; + +import org.apache.hadoop.security.token.Token; + +public interface LlapTokenLocalClient { + Token createToken( + String appId, String user, boolean isSignatureRequired) throws IOException; + + void close(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java index 540f978..58a8e96 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java @@ -179,6 +179,7 @@ public class SecretManager extends ZKDelegationTokenSecretManager() { @Override @@ -222,6 +223,11 @@ public class SecretManager extends ZKDelegationTokenSecretManager implements InputFormat { +public class LlapBaseInputFormat> + implements InputFormat { private static final Logger LOG = LoggerFactory.getLogger(LlapBaseInputFormat.class); @@ -148,12 +129,14 @@ public class LlapBaseInputFormat implements InputF LlapRecordReaderTaskUmbilicalExternalResponder umbilicalResponder = new LlapRecordReaderTaskUmbilicalExternalResponder(); + // TODO: close this LlapTaskUmbilicalExternalClient llapClient = new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(), submitWorkInfo.getToken(), umbilicalResponder); llapClient.init(job); llapClient.start(); + // TODO# vertex in this SubmitWorkRequestProto submitWorkRequestProto = constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(), llapClient.getAddress(), submitWorkInfo.getToken()); @@ -169,9 +152,8 @@ public class LlapBaseInputFormat implements InputF String id = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID) + "_" + llapSplit.getSplitNum(); - HiveConf conf = new HiveConf(); - Socket socket = new Socket(host, - serviceInstance.getOutputFormatPort()); + // TODO: security for output channel + Socket socket = new Socket(host, serviceInstance.getOutputFormatPort()); LOG.debug("Socket connected"); @@ -181,7 +163,9 @@ public class LlapBaseInputFormat implements InputF LOG.info("Registered id: " + id); - LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class, job); + @SuppressWarnings("rawtypes") + LlapBaseRecordReader recordReader = new LlapBaseRecordReader( + socket.getInputStream(), llapSplit.getSchema(), Text.class, job); umbilicalResponder.setRecordReader(recordReader); return recordReader; } @@ -294,26 +278,22 @@ public class LlapBaseInputFormat implements InputF return null; } - private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo, + private SubmitWorkRequestProto constructSubmitWorkRequestProto( + SubmitWorkInfo submitWorkInfo, int taskNum, InetSocketAddress address, Token token) throws IOException { - TaskSpec taskSpec = submitWorkInfo.getTaskSpec(); ApplicationId appId = submitWorkInfo.getFakeAppId(); - int attemptId = taskSpec.getTaskAttemptID().getId(); // This works, assuming the executor is running within YARN. String user = System.getenv(ApplicationConstants.Environment.USER.name()); LOG.info("Setting user in submitWorkRequest to: " + user); - SignableVertexSpec svs = Converters.convertTaskSpecToProto( - taskSpec, attemptId, appId.toString(), null, user); // TODO signatureKeyId + // TODO: this is bogus. What does LLAP use this for? ContainerId containerId = - ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum); - + ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum); - Credentials taskCredentials = new Credentials(); // Credentials can change across DAGs. Ideally construct only once per DAG. Credentials credentials = new Credentials(); TokenCache.setSessionToken(token, credentials); @@ -324,15 +304,20 @@ public class LlapBaseInputFormat implements InputF runtimeInfo.setWithinDagPriority(0); runtimeInfo.setDagStartTime(submitWorkInfo.getCreationTime()); runtimeInfo.setFirstAttemptStartTime(submitWorkInfo.getCreationTime()); - runtimeInfo.setNumSelfAndUpstreamTasks(taskSpec.getVertexParallelism()); + runtimeInfo.setNumSelfAndUpstreamTasks(submitWorkInfo.getVertexParallelism()); runtimeInfo.setNumSelfAndUpstreamCompletedTasks(0); SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder(); - builder.setWorkSpec(VertexOrBinary.newBuilder().setVertex(svs).build()); - // TODO work spec signature - builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId()); - builder.setAttemptNumber(0); + VertexOrBinary.Builder vertexBuilder = VertexOrBinary.newBuilder(); + vertexBuilder.setVertexBinary(ByteString.copyFrom(submitWorkInfo.getVertexBinary())); + if (submitWorkInfo.getVertexSignature() != null) { + // Unsecure case? + builder.setWorkSpecSignature(ByteString.copyFrom(submitWorkInfo.getVertexSignature())); + } + builder.setWorkSpec(vertexBuilder.build()); + builder.setFragmentNumber(taskNum); + builder.setAttemptNumber(0); // TODO: hmm builder.setContainerIdString(containerId.toString()); builder.setAmHost(address.getHostName()); builder.setAmPort(address.getPort()); @@ -351,7 +336,7 @@ public class LlapBaseInputFormat implements InputF } private static class LlapRecordReaderTaskUmbilicalExternalResponder implements LlapTaskUmbilicalExternalResponder { - protected LlapBaseRecordReader recordReader = null; + protected LlapBaseRecordReader recordReader = null; protected LinkedBlockingQueue queuedEvents = new LinkedBlockingQueue(); public LlapRecordReaderTaskUmbilicalExternalResponder() { @@ -369,6 +354,7 @@ public class LlapBaseInputFormat implements InputF @Override public void heartbeat(TezHeartbeatRequest request) { + // TODO: why is this ignored? TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID(); List inEvents = request.getEvents(); for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) { @@ -415,7 +401,7 @@ public class LlapBaseInputFormat implements InputF } } - public synchronized LlapBaseRecordReader getRecordReader() { + public synchronized LlapBaseRecordReader getRecordReader() { return recordReader; } @@ -441,7 +427,7 @@ public class LlapBaseInputFormat implements InputF * @param readerEvent */ protected synchronized void sendOrQueueEvent(ReaderEvent readerEvent) { - LlapBaseRecordReader recordReader = getRecordReader(); + LlapBaseRecordReader recordReader = getRecordReader(); if (recordReader != null) { recordReader.handleEvent(readerEvent); } else { http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 6f21d3c..387000d 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -113,7 +113,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu this.localShufflePort = localShufflePort; this.amReporter = amReporter; this.signer = UserGroupInformation.isSecurityEnabled() - ? new LlapSignerImpl(conf, daemonId) : null; + ? new LlapSignerImpl(conf, daemonId.getClusterString()) : null; this.fsUgiFactory = fsUgiFactory; this.clusterId = daemonId.getClusterString(); http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java deleted file mode 100644 index 4174593..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.llap.security; - -import java.io.IOException; -import java.util.Arrays; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.llap.DaemonId; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.delegation.DelegationKey; - -import com.google.common.annotations.VisibleForTesting; - -public class LlapSignerImpl implements LlapSigner { - private final SigningSecretManager secretManager; - - public LlapSignerImpl(Configuration conf, DaemonId daemonId) { - // TODO: create this centrally in HS2 case - secretManager = SecretManager.createSecretManager(conf, daemonId.getClusterString()); - } - - @VisibleForTesting - public LlapSignerImpl(SigningSecretManager sm) { - secretManager = sm; - } - - @Override - public SignedMessage serializeAndSign(Signable message) throws IOException { - SignedMessage result = new SignedMessage(); - DelegationKey key = secretManager.getCurrentKey(); - message.setSignInfo(key.getKeyId(), UserGroupInformation.getCurrentUser().getUserName()); - result.message = message.serialize(); - result.signature = secretManager.signWithKey(result.message, key); - return result; - } - - @Override - public void checkSignature(byte[] message, byte[] signature, int keyId) - throws SecurityException { - byte[] expectedSignature = secretManager.signWithKey(message, keyId); - if (Arrays.equals(signature, expectedSignature)) return; - throw new SecurityException("Message signature does not match"); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-server/src/test/org/apache/hadoop/hive/llap/security/TestLlapSignerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/security/TestLlapSignerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/security/TestLlapSignerImpl.java index 0420225..a281fd6 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/security/TestLlapSignerImpl.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/security/TestLlapSignerImpl.java @@ -127,7 +127,7 @@ public class TestLlapSignerImpl { } @Override - public void setSignInfo(int masterKeyId, String user) { + public void setSignInfo(int masterKeyId) { this.masterKeyId = masterKeyId; } @@ -196,5 +196,10 @@ public class TestLlapSignerImpl { public AbstractDelegationTokenIdentifier createIdentifier() { throw new UnsupportedOperationException(); } + + @Override + public void close() { + stopThreads(); + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 026df3b..80096f5 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -599,7 +599,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { } builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary)); builder.setWorkSpec(VertexOrBinary.newBuilder().setVertex(Converters.convertTaskSpecToProto( - taskSpec, appAttemptId, getTokenIdentifier(), null, user)).build()); + taskSpec, appAttemptId, getTokenIdentifier(), user)).build()); // Don't call builder.setWorkSpecSignature() - Tez doesn't sign fragments builder.setFragmentRuntimeInfo(fragmentRuntimeInfo); return builder.build(); http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/ql/pom.xml ---------------------------------------------------------------------- diff --git a/ql/pom.xml b/ql/pom.xml index db03978..02ddb80 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -812,6 +812,7 @@ org.apache.hive:hive-common org.apache.hive:hive-exec org.apache.hive:hive-serde + org.apache.hive:hive-llap-common org.apache.hive:hive-llap-client org.apache.hive:hive-metastore org.apache.hive:hive-service-rpc http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index d04cfa3..919b35a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -51,12 +51,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.DaemonId; -import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator; import org.apache.hadoop.hive.llap.impl.LlapProtocolClientImpl; +import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.llap.security.LlapTokenClient; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; -import org.apache.hadoop.hive.llap.security.LlapTokenLocalClient; import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy; import org.apache.hadoop.hive.llap.tezplugins.LlapContainerLauncher; import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator; @@ -85,11 +84,6 @@ import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; - /** * Holds session state related to Tez */ @@ -338,19 +332,6 @@ public class TezSessionState { } } - // Only cache ZK connections (ie local clients); these are presumed to be used in HS2. - // TODO: temporary before HIVE-13698. - private static final Cache localClientCache = CacheBuilder - .newBuilder().expireAfterAccess(10, TimeUnit.MINUTES) - .removalListener(new RemovalListener() { - @Override - public void onRemoval(RemovalNotification notification) { - if (notification.getValue() != null) { - notification.getValue().close(); - } - } - }).build(); - private static Token getLlapToken( String user, final Configuration conf) throws IOException { // TODO: parts of this should be moved out of TezSession to reuse the clients, but there's @@ -359,21 +340,17 @@ public class TezSessionState { boolean isInHs2 = session != null && session.isHiveServerQuery(); Token token = null; // For Tez, we don't use appId to distinguish the tokens. + + LlapCoordinator coordinator = null; if (isInHs2) { // We are in HS2, get the token locally. - String clusterName = LlapUtil.generateClusterName(conf); - // This assumes that the LLAP cluster and session are both running under HS2 user. - final String clusterId = DaemonId.createClusterString(user, clusterName); - try { - token = localClientCache.get(clusterId, new Callable() { - @Override - public LlapTokenLocalClient call() throws Exception { - return new LlapTokenLocalClient(conf, clusterId); - } - }).createToken(null, null, false); // Signature is not required for Tez. - } catch (ExecutionException e) { - throw new IOException(e); + // TODO: coordinator should be passed in; HIVE-13698. Must be initialized for now. + coordinator = LlapCoordinator.getInstance(); + if (coordinator == null) { + throw new IOException("LLAP coordinator not initialized; cannot get LLAP tokens"); } + // Signing is not required for Tez. + token = coordinator.getLocalTokenClient(conf, user).createToken(null, null, false); } else { // We are not in HS2; always create a new client for now. token = new LlapTokenClient(conf).getDelegationToken(null); http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index 140dbda..7be6f7b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -28,7 +28,6 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Random; import java.util.Set; import java.util.UUID; @@ -49,6 +48,12 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.llap.Schema; import org.apache.hadoop.hive.llap.FieldDesc; import org.apache.hadoop.hive.llap.TypeDesc; +import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; +import org.apache.hadoop.hive.llap.security.LlapSigner; +import org.apache.hadoop.hive.llap.security.LlapSigner.Signable; +import org.apache.hadoop.hive.llap.security.LlapSigner.SignedMessage; +import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.Driver; @@ -61,7 +66,6 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.exec.tez.DagUtils; import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator; import org.apache.hadoop.hive.ql.exec.tez.TezTask; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.TezWork; @@ -91,7 +95,6 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.TaskSpecBuilder; -import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.Vertex; import org.apache.tez.mapreduce.grouper.TezSplitGrouper; import org.apache.tez.runtime.api.Event; @@ -112,11 +115,8 @@ import com.google.common.base.Preconditions; + "Returns an array of length int serialized splits for the referenced tables string.") @UDFType(deterministic = false) public class GenericUDTFGetSplits extends GenericUDTF { - private static final Logger LOG = LoggerFactory.getLogger(GenericUDTFGetSplits.class); - private static final String LLAP_INTERNAL_INPUT_FORMAT_NAME = "org.apache.hadoop.hive.llap.LlapInputFormat"; - protected transient StringObjectInspector stringOI; protected transient IntObjectInspector intOI; protected transient JobConf jc; @@ -307,10 +307,10 @@ public class GenericUDTFGetSplits extends GenericUDTF { Preconditions.checkState(HiveConf.getBoolVar(wxConf, HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS)); HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, mapWork); + // TODO: these need to be signed, or at least the splits inside. List eventList = splitGenerator.initialize(); InputSplit[] result = new InputSplit[eventList.size() - 1]; - DataOutputBuffer dob = new DataOutputBuffer(); InputConfigureVertexTasksEvent configureEvent = (InputConfigureVertexTasksEvent) eventList.get(0); @@ -324,19 +324,35 @@ public class GenericUDTFGetSplits extends GenericUDTF { LOG.debug("NumSplits=" + result.length); } - ApplicationId fakeApplicationId - = ApplicationId.newInstance(Math.abs(new Random().nextInt()), 0); + LlapCoordinator coordinator = LlapCoordinator.getInstance(); + if (coordinator == null) { + throw new IOException("LLAP coordinator is not initialized; must be running in HS2 with " + + ConfVars.LLAP_HS2_ENABLE_COORDINATOR.varname + " enabled"); + } + + // See the discussion in the implementation as to why we generate app ID. + ApplicationId fakeApplicationId = coordinator.createExtClientAppId(); + // This assumes LLAP cluster owner is always the HS2 user. String llapUser = UserGroupInformation.getLoginUser().getShortUserName(); + LlapSigner signer = UserGroupInformation.isSecurityEnabled() + ? coordinator.getLlapSigner(job) : null; + LOG.info("Number of splits: " + (eventList.size() - 1)); + SignedMessage signedSvs = null; + DataOutputBuffer dob = new DataOutputBuffer(); for (int i = 0; i < eventList.size() - 1; i++) { - - TaskSpec taskSpec = - new TaskSpecBuilder().constructTaskSpec(dag, vertexName, + TaskSpec taskSpec = new TaskSpecBuilder().constructTaskSpec(dag, vertexName, eventList.size() - 1, fakeApplicationId, i); - SubmitWorkInfo submitWorkInfo = - new SubmitWorkInfo(taskSpec, fakeApplicationId, System.currentTimeMillis()); + if (i == 0) { + // Despite the differences in TaskSpec, the vertex spec should be the same. + signedSvs = createSignedVertexSpec(signer, taskSpec, fakeApplicationId); + } + + SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(fakeApplicationId, + System.currentTimeMillis(), taskSpec.getVertexParallelism(), signedSvs.message, + signedSvs.signature); EventMetaData sourceMetaData = new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, vertexName, "NULL_VERTEX", null); @@ -372,6 +388,40 @@ public class GenericUDTFGetSplits extends GenericUDTF { } } + private SignedMessage createSignedVertexSpec(LlapSigner signer, TaskSpec taskSpec, + ApplicationId fakeApplicationId) throws IOException { + // We put the query user, not LLAP user, in the message (and the token later?) + String user = SessionState.getUserFromAuthenticator(); + if (user == null) { + user = UserGroupInformation.getCurrentUser().getUserName(); + LOG.warn("Cannot determine the session user; using " + user + " instead"); + } + final SignableVertexSpec.Builder svsb = Converters.convertTaskSpecToProto( + taskSpec, 0, fakeApplicationId.toString(), user); + if (signer == null) { + SignedMessage result = new SignedMessage(); + result.message = serializeVertexSpec(svsb); + return result; + } + return signer.serializeAndSign(new Signable() { + @Override + public void setSignInfo(int masterKeyId) { + svsb.setSignatureKeyId(masterKeyId); + } + + @Override + public byte[] serialize() throws IOException { + return serializeVertexSpec(svsb); + } + }); + } + + private static byte[] serializeVertexSpec(SignableVertexSpec.Builder svsb) throws IOException { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + svsb.build().writeTo(os); + return os.toByteArray(); + } + /** * Returns a local resource representing a jar. This resource will be used to execute the plan on * the cluster. http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/service/src/java/org/apache/hive/service/server/HiveServer2.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index d61edf5..14e1e0d 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -50,6 +50,8 @@ import org.apache.hadoop.hive.common.cli.CommonCliOptions; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator; +import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper; @@ -133,6 +135,14 @@ public class HiveServer2 extends CompositeService { } catch (Throwable t) { throw new Error("Unable to intitialize HiveServer2", t); } + if (HiveConf.getBoolVar(hiveConf, ConfVars.LLAP_HS2_ENABLE_COORDINATOR)) { + // See method comment. + try { + LlapCoordinator.initializeInstance(hiveConf); + } catch (IOException e) { + throw new RuntimeException(e); + } + } // Setup web UI try { if (hiveConf.getBoolVar(ConfVars.HIVE_IN_TEST)) {