hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject [1/3] hive git commit: HIVE-13675 : LLAP: add HMAC signatures to LLAPIF splits (Sergey Shelukhin, reviewed by Siddharth Seth)
Date Thu, 09 Jun 2016 18:57:05 GMT
Repository: hive
Updated Branches:
  refs/heads/master f67c862e3 -> b895d9d60


HIVE-13675 : LLAP: add HMAC signatures to LLAPIF splits (Sergey Shelukhin, reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6a59cfd5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6a59cfd5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6a59cfd5

Branch: refs/heads/master
Commit: 6a59cfd5e1e724697279b285eebed851c49ecdaa
Parents: f67c862
Author: Sergey Shelukhin <sershe@apache.org>
Authored: Thu Jun 9 11:47:41 2016 -0700
Committer: Sergey Shelukhin <sershe@apache.org>
Committed: Thu Jun 9 11:47:41 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   5 +
 .../apache/hadoop/hive/llap/SubmitWorkInfo.java |  54 +++++--
 .../hive/llap/coordinator/LlapCoordinator.java  | 159 +++++++++++++++++++
 .../ext/LlapTaskUmbilicalExternalClient.java    |  26 +--
 .../hadoop/hive/llap/io/api/LlapProxy.java      |  20 +--
 .../llap/security/LlapTokenLocalClient.java     |  59 -------
 .../llap/security/LlapTokenLocalClientImpl.java |  61 +++++++
 .../hadoop/hive/llap/security/LlapSigner.java   |   4 +-
 .../hive/llap/security/LlapSignerImpl.java      |  73 +++++++++
 .../llap/security/LlapTokenLocalClient.java     |  30 ++++
 .../hive/llap/security/SecretManager.java       |   6 +
 .../llap/security/SigningSecretManager.java     |   1 +
 .../apache/hadoop/hive/llap/tez/Converters.java |   9 +-
 .../hadoop/hive/llap/tez/TestConverters.java    |   2 +-
 .../hadoop/hive/llap/LlapBaseInputFormat.java   |  90 +++++------
 .../llap/daemon/impl/ContainerRunnerImpl.java   |   2 +-
 .../hive/llap/security/LlapSignerImpl.java      |  60 -------
 .../hive/llap/security/TestLlapSignerImpl.java  |   7 +-
 .../llap/tezplugins/LlapTaskCommunicator.java   |   2 +-
 ql/pom.xml                                      |   1 +
 .../hive/ql/exec/tez/TezSessionState.java       |  43 ++---
 .../ql/udf/generic/GenericUDTFGetSplits.java    |  78 +++++++--
 .../apache/hive/service/server/HiveServer2.java |  10 ++
 23 files changed, 540 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index fe69ffa..285caa3 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2888,6 +2888,11 @@ public class HiveConf extends Configuration {
     LLAP_ENABLE_GRACE_JOIN_IN_LLAP("hive.llap.enable.grace.join.in.llap", false,
         "Override if grace join should be allowed to run in llap."),
 
+    LLAP_HS2_ENABLE_COORDINATOR("hive.llap.hs2.coordinator.enabled", true,
+        "Whether to create the LLAP coordinator; since execution engine and container vs llap\n" +
+        "settings are both coming from job configs, we don't know at start whether this should\n" +
+        "be created. Default true."),
+
     SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
       "60s", new TimeValidator(TimeUnit.SECONDS),
       "Timeout for requests from Hive client to remote Spark driver."),

http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
index 6704294..95b0ffc 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
@@ -28,33 +28,32 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.runtime.api.impl.TaskSpec;
 
 public class SubmitWorkInfo implements Writable {
 
-  private TaskSpec taskSpec;
   private ApplicationId fakeAppId;
   private long creationTime;
+  private byte[] vertexSpec, vertexSpecSignature;
 
   // This is used to communicate over the LlapUmbilicalProtocol. Not related to tokens used to
   // talk to LLAP daemons itself via the securit work.
   private Token<JobTokenIdentifier> token;
+  private int vertexParallelism;
 
-  public SubmitWorkInfo(TaskSpec taskSpec, ApplicationId fakeAppId, long creationTime) {
-    this.taskSpec = taskSpec;
+  public SubmitWorkInfo(ApplicationId fakeAppId, long creationTime,
+      int vertexParallelism, byte[] vertexSpec, byte[] vertexSpecSignature) {
     this.fakeAppId = fakeAppId;
     this.token = createJobToken();
     this.creationTime = creationTime;
+    this.vertexSpec = vertexSpec;
+    this.vertexSpecSignature = vertexSpecSignature;
+    this.vertexParallelism = vertexParallelism;
   }
 
   // Empty constructor for writable etc.
   public SubmitWorkInfo() {
   }
 
-  public TaskSpec getTaskSpec() {
-    return taskSpec;
-  }
-
   public ApplicationId getFakeAppId() {
     return fakeAppId;
   }
@@ -73,23 +72,44 @@ public class SubmitWorkInfo implements Writable {
 
   @Override
   public void write(DataOutput out) throws IOException {
-    taskSpec.write(out);
     out.writeLong(fakeAppId.getClusterTimestamp());
     out.writeInt(fakeAppId.getId());
     token.write(out);
     out.writeLong(creationTime);
+    out.writeInt(vertexParallelism);
+    if (vertexSpec != null) {
+      out.writeInt(vertexSpec.length);
+      out.write(vertexSpec);
+    } else {
+      out.writeInt(0);
+    }
+    if (vertexSpecSignature != null) {
+      out.writeInt(vertexSpecSignature.length);
+      out.write(vertexSpecSignature);
+    } else {
+      out.writeInt(0);
+    }
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    taskSpec = new TaskSpec();
-    taskSpec.readFields(in);
     long appIdTs = in.readLong();
     int appIdId = in.readInt();
     fakeAppId = ApplicationId.newInstance(appIdTs, appIdId);
     token = new Token<>();
     token.readFields(in);
     creationTime = in.readLong();
+    vertexParallelism = in.readInt();
+    int vertexSpecBytes = in.readInt();
+    if (vertexSpecBytes > 0) {
+      vertexSpec = new byte[vertexSpecBytes];
+      in.readFully(vertexSpec);
+    }
+    int vertexSpecSignBytes = in.readInt();
+    if (vertexSpecSignBytes > 0) {
+      vertexSpecSignature = new byte[vertexSpecSignBytes];
+      in.readFully(vertexSpecSignature);
+    }
   }
 
   public static byte[] toBytes(SubmitWorkInfo submitWorkInfo) throws IOException {
@@ -116,4 +136,16 @@ public class SubmitWorkInfo implements Writable {
     sessionToken.setService(identifier.getJobId());
     return sessionToken;
   }
+
+  public byte[] getVertexBinary() {
+    return vertexSpec;
+  }
+
+  public byte[] getVertexSignature() {
+    return vertexSpecSignature;
+  }
+
+  public int getVertexParallelism() {
+    return vertexParallelism;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-client/src/java/org/apache/hadoop/hive/llap/coordinator/LlapCoordinator.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/coordinator/LlapCoordinator.java b/llap-client/src/java/org/apache/hadoop/hive/llap/coordinator/LlapCoordinator.java
new file mode 100644
index 0000000..f55779b
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/coordinator/LlapCoordinator.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.coordinator;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.DaemonId;
+import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator;
+import org.apache.hadoop.hive.llap.security.LlapSigner;
+import org.apache.hadoop.hive.llap.security.LlapSignerImpl;
+import org.apache.hadoop.hive.llap.security.LlapTokenLocalClient;
+import org.apache.hadoop.hive.llap.security.LlapTokenLocalClientImpl;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+/**
+ * The class containing facilities for LLAP interactions in HS2.
+ * This may eventually evolve into a central LLAP manager hosted by HS2 or elsewhere.
+ * Refactor as needed.
+ */
+public class LlapCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(LlapCoordinator.class);
+
+  /** We'll keep signers per cluster around for some time, for reuse. */
+  private final Cache<String, LlapSigner> signers = CacheBuilder.newBuilder().removalListener(
+      new RemovalListener<String, LlapSigner>() {
+        @Override
+        public void onRemoval(RemovalNotification<String, LlapSigner> notification) {
+          if (notification.getValue() != null) {
+            notification.getValue().close();
+          }
+        }
+      }).expireAfterAccess(10, TimeUnit.MINUTES).build();
+
+  // TODO: probably temporary before HIVE-13698; after that we may create one per session.
+  private static final Cache<String, LlapTokenLocalClient> localClientCache = CacheBuilder
+      .newBuilder().expireAfterAccess(10, TimeUnit.MINUTES)
+      .removalListener(new RemovalListener<String, LlapTokenLocalClient>() {
+        @Override
+        public void onRemoval(RemovalNotification<String, LlapTokenLocalClient> notification) {
+          if (notification.getValue() != null) {
+            notification.getValue().close();
+          }
+        }
+      }).build();
+
+  private HiveConf hiveConf;
+  private String clusterUser;
+
+  LlapCoordinator() {
+  }
+
+  private void init(HiveConf hiveConf) throws IOException {
+    // Only do the lightweight stuff in ctor; by default, LLAP coordinator is created during
+    // HS2 init without the knowledge of LLAP usage (or lack thereof) in the cluster.
+    this.hiveConf = hiveConf;
+    this.clusterUser = UserGroupInformation.getCurrentUser().getShortUserName();
+  }
+
+  public LlapSigner getLlapSigner(final Configuration jobConf) {
+    // Note that we create the cluster name from user conf (hence, a user can target a cluster),
+    // but then we create the signer using hiveConf (hence, we control the ZK config and stuff).
+    assert UserGroupInformation.isSecurityEnabled();
+    final String clusterId = DaemonId.createClusterString(
+        clusterUser, LlapUtil.generateClusterName(jobConf));
+    try {
+      return signers.get(clusterId, new Callable<LlapSigner>() {
+        public LlapSigner call() throws Exception {
+          return new LlapSignerImpl(hiveConf, clusterId);
+        }
+      });
+    } catch (ExecutionException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public ApplicationId createExtClientAppId() {
+    // TODO: moved from UDTF; need JIRA to generate this properly (no dups, etc.)...
+    return ApplicationId.newInstance(Math.abs(new Random().nextInt()), 0);
+    // Note that we cannot allow users to provide app ID, since providing somebody else's appId
+    // would give one LLAP token (and splits) for that app ID. If we could verify it somehow
+    // (YARN token? nothing we can do in an UDF), we could get it from client already running on
+    // YARN. As such, the clients running on YARN will have two app IDs to be aware of.
+    // TODO: Perhaps they can give us their app id as an argument to the UDF, and we'd just append
+    // a unique string here, for easier tracking?
+  }
+
+  public LlapTokenLocalClient getLocalTokenClient(
+      final Configuration conf, String clusterUser) throws IOException {
+    // Note that we create the cluster name from user conf (hence, a user can target a cluster),
+    // but then we create the signer using hiveConf (hence, we control the ZK config and stuff).
+    assert UserGroupInformation.isSecurityEnabled();
+    String clusterName = LlapUtil.generateClusterName(conf);
+    // This assumes that the LLAP cluster and session are both running under HS2 user.
+    final String clusterId = DaemonId.createClusterString(clusterUser, clusterName);
+    try {
+      return localClientCache.get(clusterId, new Callable<LlapTokenLocalClientImpl>() {
+        @Override
+        public LlapTokenLocalClientImpl call() throws Exception {
+          return new LlapTokenLocalClientImpl(hiveConf, clusterId);
+        }
+      });
+    } catch (ExecutionException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public void close() {
+    try {
+      localClientCache.invalidateAll();
+      signers.invalidateAll();
+      localClientCache.cleanUp();
+      signers.cleanUp();
+    } catch (Exception ex) {
+      LOG.error("Error closing the coordinator; ignoring", ex);
+    }
+  }
+
+  /** TODO: ideally, when the splits UDF is made a proper API, coordinator should not
+   *        be managed as a global. HS2 should create it and then pass it around. */
+  private static final LlapCoordinator INSTANCE = new LlapCoordinator();
+  public static void initializeInstance(HiveConf hiveConf) throws IOException {
+    INSTANCE.init(hiveConf);
+  }
+
+  public static LlapCoordinator getInstance() {
+    return INSTANCE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index 0edb1cd..e21481e 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -27,7 +27,8 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+
 import org.apache.commons.collections4.ListUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexIdentifier;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary;
 import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
 import org.apache.hadoop.hive.llap.tez.Converters;
 import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
@@ -43,12 +45,9 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.impl.EventType;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
@@ -139,14 +138,23 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
 
   /**
    * Submit the work for actual execution.
-   * @param submitWorkRequestProto
+   * @throws InvalidProtocolBufferException 
    */
-  public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort, List<TezEvent> tezEvents) {
+  public void submitWork(
+      SubmitWorkRequestProto request, String llapHost, int llapPort, List<TezEvent> tezEvents) {
     // Register the pending events to be sent for this spec.
-    SignableVertexSpec vertex = submitWorkRequestProto.getWorkSpec().getVertex();
+    VertexOrBinary vob = request.getWorkSpec();
+    assert vob.hasVertexBinary() != vob.hasVertex();
+    SignableVertexSpec vertex = null;
+    try {
+      vertex = vob.hasVertex() ? vob.getVertex()
+          : SignableVertexSpec.parseFrom(vob.getVertexBinary());
+    } catch (InvalidProtocolBufferException e) {
+      throw new RuntimeException(e);
+    }
     VertexIdentifier vId = vertex.getVertexIdentifier();
     TezTaskAttemptID attemptId = Converters.createTaskAttemptId(
-        vId, submitWorkRequestProto.getFragmentNumber(), submitWorkRequestProto.getAttemptNumber());
+        vId, request.getFragmentNumber(), request.getAttemptNumber());
     final String fragmentId = attemptId.toString();
 
     PendingEventData pendingEventData = new PendingEventData(
@@ -159,7 +167,7 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
         connectionTimeout, connectionTimeout, TimeUnit.MILLISECONDS);
 
     // Send out the actual SubmitWorkRequest
-    communicator.sendSubmitWork(submitWorkRequestProto, llapHost, llapPort,
+    communicator.sendSubmitWork(request, llapHost, llapPort,
         new LlapProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.SubmitWorkResponseProto>() {
 
           @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java
index 6c2618b..8f1b59b 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java
@@ -17,14 +17,13 @@
  */
 package org.apache.hadoop.hive.llap.io.api;
 
-import java.io.IOException;
 import java.lang.reflect.Constructor;
-
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator;
 
 @SuppressWarnings("rawtypes")
 public class LlapProxy {
-  private final static String IMPL_CLASS = "org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl";
+  private final static String IO_IMPL_CLASS = "org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl";
 
   // Llap server depends on Hive execution, so the reverse cannot be true. We create the I/O
   // singleton once (on daemon startup); the said singleton serves as the IO interface.
@@ -48,23 +47,18 @@ public class LlapProxy {
     if (io != null) {
       return; // already initialized
     }
-
-    try {
-      io = createIoImpl(conf);
-    } catch (IOException e) {
-      throw new RuntimeException("Cannot initialize local server", e);
-    }
+    io = createInstance(IO_IMPL_CLASS, conf);
   }
 
-  private static LlapIo createIoImpl(Configuration conf) throws IOException {
+  private static <T> T createInstance(String className, Configuration conf) {
     try {
       @SuppressWarnings("unchecked")
-      Class<? extends LlapIo> clazz = (Class<? extends LlapIo>)Class.forName(IMPL_CLASS);
-      Constructor<? extends LlapIo> ctor = clazz.getDeclaredConstructor(Configuration.class);
+      Class<? extends T> clazz = (Class<? extends T>)Class.forName(className);
+      Constructor<? extends T> ctor = clazz.getDeclaredConstructor(Configuration.class);
       ctor.setAccessible(true);
       return ctor.newInstance(conf);
     } catch (Exception e) {
-      throw new RuntimeException("Failed to create impl class", e);
+      throw new RuntimeException("Failed to create " + className, e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java
deleted file mode 100644
index af889b6..0000000
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.security;
-
-import java.io.IOException;
-
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.token.Token;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LlapTokenLocalClient {
-  private static final Logger LOG = LoggerFactory.getLogger(LlapTokenLocalClient.class);
-  private final SecretManager secretManager;
-
-  public LlapTokenLocalClient(Configuration conf, String clusterId) {
-    // TODO: create this centrally in HS2 case
-    secretManager = SecretManager.createSecretManager(conf, clusterId);
-  }
-
-  public Token<LlapTokenIdentifier> createToken(
-      String appId, String user, boolean isSignatureRequired) throws IOException {
-    try {
-      Token<LlapTokenIdentifier> token = secretManager.createLlapToken(
-          appId, user, isSignatureRequired);
-      if (LOG.isInfoEnabled()) {
-        LOG.info("Created a LLAP delegation token locally: " + token);
-      }
-      return token;
-    } catch (Exception ex) {
-      throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex);
-    }
-  }
-
-  public void close() {
-    try {
-      secretManager.stopThreads();
-    } catch (Exception ex) {
-      // Ignore.
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClientImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClientImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClientImpl.java
new file mode 100644
index 0000000..78510b0
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClientImpl.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.IOException;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LlapTokenLocalClientImpl implements LlapTokenLocalClient {
+  private static final Logger LOG = LoggerFactory.getLogger(LlapTokenLocalClientImpl.class);
+  private final SecretManager secretManager;
+
+  public LlapTokenLocalClientImpl(Configuration conf, String clusterId) {
+    // TODO: create this centrally in HS2 case
+    secretManager = SecretManager.createSecretManager(conf, clusterId);
+  }
+
+  @Override
+  public Token<LlapTokenIdentifier> createToken(
+      String appId, String user, boolean isSignatureRequired) throws IOException {
+    try {
+      Token<LlapTokenIdentifier> token = secretManager.createLlapToken(
+          appId, user, isSignatureRequired);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Created a LLAP delegation token locally: " + token);
+      }
+      return token;
+    } catch (Exception ex) {
+      throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex);
+    }
+  }
+
+  @Override
+  public void close() {
+    try {
+      secretManager.stopThreads();
+    } catch (Exception ex) {
+      // Ignore.
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSigner.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSigner.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSigner.java
index 478a40a..7b9c8cc 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSigner.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSigner.java
@@ -24,7 +24,7 @@ public interface LlapSigner {
   /** An object signable by a signer. */
   public interface Signable {
     /** Called by the signer to record key information as part of the message to be signed. */
-    void setSignInfo(int masterKeyId, String user);
+    void setSignInfo(int masterKeyId);
     /** Called by the signer to get the serialized representation of the message to be signed. */
     byte[] serialize() throws IOException;
   }
@@ -38,4 +38,6 @@ public interface LlapSigner {
   SignedMessage serializeAndSign(Signable message) throws IOException;
 
   void checkSignature(byte[] message, byte[] signature, int keyId);
+
+  void close();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java
new file mode 100644
index 0000000..7f10505
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class LlapSignerImpl implements LlapSigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LlapSignerImpl.class);
+
+  private final SigningSecretManager secretManager;
+
+  public LlapSignerImpl(Configuration conf, String clusterId) {
+    // TODO: create this centrally in HS2 case
+    assert UserGroupInformation.isSecurityEnabled();
+    secretManager = SecretManager.createSecretManager(conf, clusterId);
+  }
+
+  @VisibleForTesting
+  public LlapSignerImpl(SigningSecretManager sm) {
+    secretManager = sm;
+  }
+
+  @Override
+  public SignedMessage serializeAndSign(Signable message) throws IOException {
+    SignedMessage result = new SignedMessage();
+    DelegationKey key = secretManager.getCurrentKey();
+    message.setSignInfo(key.getKeyId());
+    result.message = message.serialize();
+    result.signature = secretManager.signWithKey(result.message, key);
+    return result;
+  }
+
+  @Override
+  public void checkSignature(byte[] message, byte[] signature, int keyId)
+      throws SecurityException {
+    byte[] expectedSignature = secretManager.signWithKey(message, keyId);
+    if (Arrays.equals(signature, expectedSignature)) return;
+    throw new SecurityException("Message signature does not match");
+  }
+
+  @Override
+  public void close() {
+    try {
+      secretManager.close();
+    } catch (Exception ex) {
+      LOG.error("Error closing the signer", ex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java
new file mode 100644
index 0000000..fd09652
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.IOException;
+
+import org.apache.hadoop.security.token.Token;
+
+public interface LlapTokenLocalClient {
+  Token<LlapTokenIdentifier> createToken(
+      String appId, String user, boolean isSignatureRequired) throws IOException;
+
+  void close();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
index 540f978..58a8e96 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
@@ -179,6 +179,7 @@ public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdent
 
   public static SecretManager createSecretManager(
       final Configuration conf, String llapPrincipal, String llapKeytab, final String clusterId) {
+    assert UserGroupInformation.isSecurityEnabled();
     final LlapZkConf c = createLlapZkConf(conf, llapPrincipal, llapKeytab, clusterId);
     return c.zkUgi.doAs(new PrivilegedAction<SecretManager>() {
       @Override
@@ -222,6 +223,11 @@ public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdent
     return token;
   }
 
+  @Override
+  public void close() {
+    stopThreads();
+  }
+
   private static void checkRootAcls(Configuration conf, String path, String user) {
     int stime = conf.getInt(ZK_DTSM_ZK_SESSION_TIMEOUT, ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT),
         ctime = conf.getInt(ZK_DTSM_ZK_CONNECTION_TIMEOUT, ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT);

http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-common/src/java/org/apache/hadoop/hive/llap/security/SigningSecretManager.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SigningSecretManager.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SigningSecretManager.java
index 067a98e..82b1992 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SigningSecretManager.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SigningSecretManager.java
@@ -23,4 +23,5 @@ public interface SigningSecretManager {
   DelegationKey getCurrentKey();
   byte[] signWithKey(byte[] message, DelegationKey key);
   byte[] signWithKey(byte[] message, int keyId) throws SecurityException;
+  void close();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java b/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
index e43b72b..dad5e07 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
@@ -114,8 +114,8 @@ public class Converters {
     return idBuilder.build();
   }
 
-  public static SignableVertexSpec convertTaskSpecToProto(TaskSpec taskSpec,
-      int appAttemptId, String tokenIdentifier, Integer signatureKeyId, String user) {
+  public static SignableVertexSpec.Builder convertTaskSpecToProto(TaskSpec taskSpec,
+      int appAttemptId, String tokenIdentifier, String user) {
     TezTaskAttemptID tId = taskSpec.getTaskAttemptID();
 
     SignableVertexSpec.Builder builder = SignableVertexSpec.newBuilder();
@@ -125,9 +125,6 @@ public class Converters {
     builder.setVertexParallelism(taskSpec.getVertexParallelism());
     builder.setTokenIdentifier(tokenIdentifier);
     builder.setUser(user);
-    if (signatureKeyId != null) {
-      builder.setSignatureKeyId(signatureKeyId);
-    }
 
     if (taskSpec.getProcessorDescriptor() != null) {
       builder.setProcessorDescriptor(
@@ -152,7 +149,7 @@ public class Converters {
 
       }
     }
-    return builder.build();
+    return builder;
   }
 
   private static ProcessorDescriptor convertProcessorDescriptorFromProto(

http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java
----------------------------------------------------------------------
diff --git a/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java b/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java
index 1df6df0..85c6091 100644
--- a/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java
+++ b/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java
@@ -77,7 +77,7 @@ public class TestConverters {
         new TaskSpec(tezTaskAttemptId, "dagName", "vertexName", 10, processorDescriptor,
             inputSpecList, outputSpecList, null);
 
-    SignableVertexSpec vertexProto = Converters.convertTaskSpecToProto(taskSpec, 0, "", null, "");
+    SignableVertexSpec vertexProto = Converters.convertTaskSpecToProto(taskSpec, 0, "", "").build();
 
     assertEquals("dagName", vertexProto.getDagName());
     assertEquals("vertexName", vertexProto.getVertexName());

http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 4306c22..e16ccf5 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -16,34 +16,25 @@
  */
 package org.apache.hadoop.hive.llap;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import java.sql.SQLException;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.sql.DriverManager;
-
-import java.io.IOException;
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.DataInputStream;
-import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.collections4.ListUtils;
-
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.LlapBaseRecordReader;
 import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent;
-import org.apache.hadoop.hive.llap.LlapInputSplit;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
@@ -56,28 +47,21 @@ import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.llap.tez.Converters;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
-import org.apache.hadoop.mapred.SplitLocationInfo;
-import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -85,14 +69,10 @@ import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.impl.EventType;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
-import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 
@@ -100,7 +80,8 @@ import com.google.protobuf.ByteString;
 /**
  * Base LLAP input format to handle requesting of splits and communication with LLAP daemon.
  */
-public class LlapBaseInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
+public class LlapBaseInputFormat<V extends WritableComparable<?>>
+  implements InputFormat<NullWritable, V> {
 
   private static final Logger LOG = LoggerFactory.getLogger(LlapBaseInputFormat.class);
 
@@ -148,12 +129,14 @@ public class LlapBaseInputFormat<V extends WritableComparable> implements InputF
 
     LlapRecordReaderTaskUmbilicalExternalResponder umbilicalResponder =
         new LlapRecordReaderTaskUmbilicalExternalResponder();
+    // TODO: close this
     LlapTaskUmbilicalExternalClient llapClient =
       new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(),
           submitWorkInfo.getToken(), umbilicalResponder);
     llapClient.init(job);
     llapClient.start();
 
+    // TODO# vertex in this
     SubmitWorkRequestProto submitWorkRequestProto =
       constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(),
           llapClient.getAddress(), submitWorkInfo.getToken());
@@ -169,9 +152,8 @@ public class LlapBaseInputFormat<V extends WritableComparable> implements InputF
 
     String id = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID) + "_" + llapSplit.getSplitNum();
 
-    HiveConf conf = new HiveConf();
-    Socket socket = new Socket(host,
-        serviceInstance.getOutputFormatPort());
+    // TODO: security for output channel
+    Socket socket = new Socket(host, serviceInstance.getOutputFormatPort());
 
     LOG.debug("Socket connected");
 
@@ -181,7 +163,9 @@ public class LlapBaseInputFormat<V extends WritableComparable> implements InputF
 
     LOG.info("Registered id: " + id);
 
-    LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class, job);
+    @SuppressWarnings("rawtypes")
+    LlapBaseRecordReader recordReader = new LlapBaseRecordReader(
+        socket.getInputStream(), llapSplit.getSchema(), Text.class, job);
     umbilicalResponder.setRecordReader(recordReader);
     return recordReader;
   }
@@ -294,26 +278,22 @@ public class LlapBaseInputFormat<V extends WritableComparable> implements InputF
     return null;
   }
 
-  private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo,
+  private SubmitWorkRequestProto constructSubmitWorkRequestProto(
+      SubmitWorkInfo submitWorkInfo,
       int taskNum,
       InetSocketAddress address,
       Token<JobTokenIdentifier> token) throws
         IOException {
-    TaskSpec taskSpec = submitWorkInfo.getTaskSpec();
     ApplicationId appId = submitWorkInfo.getFakeAppId();
 
-    int attemptId = taskSpec.getTaskAttemptID().getId();
     // This works, assuming the executor is running within YARN.
     String user = System.getenv(ApplicationConstants.Environment.USER.name());
     LOG.info("Setting user in submitWorkRequest to: " + user);
-    SignableVertexSpec svs = Converters.convertTaskSpecToProto(
-        taskSpec, attemptId, appId.toString(), null, user); // TODO signatureKeyId
 
+    // TODO: this is bogus. What does LLAP use this for?
     ContainerId containerId =
-      ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum);
-
+        ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum);
 
-    Credentials taskCredentials = new Credentials();
     // Credentials can change across DAGs. Ideally construct only once per DAG.
     Credentials credentials = new Credentials();
     TokenCache.setSessionToken(token, credentials);
@@ -324,15 +304,20 @@ public class LlapBaseInputFormat<V extends WritableComparable> implements InputF
     runtimeInfo.setWithinDagPriority(0);
     runtimeInfo.setDagStartTime(submitWorkInfo.getCreationTime());
     runtimeInfo.setFirstAttemptStartTime(submitWorkInfo.getCreationTime());
-    runtimeInfo.setNumSelfAndUpstreamTasks(taskSpec.getVertexParallelism());
+    runtimeInfo.setNumSelfAndUpstreamTasks(submitWorkInfo.getVertexParallelism());
     runtimeInfo.setNumSelfAndUpstreamCompletedTasks(0);
 
     SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder();
 
-    builder.setWorkSpec(VertexOrBinary.newBuilder().setVertex(svs).build());
-    // TODO work spec signature
-    builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId());
-    builder.setAttemptNumber(0);
+    VertexOrBinary.Builder vertexBuilder = VertexOrBinary.newBuilder();
+    vertexBuilder.setVertexBinary(ByteString.copyFrom(submitWorkInfo.getVertexBinary()));
+    if (submitWorkInfo.getVertexSignature() != null) {
+      // Unsecure case?
+      builder.setWorkSpecSignature(ByteString.copyFrom(submitWorkInfo.getVertexSignature()));
+    }
+    builder.setWorkSpec(vertexBuilder.build());
+    builder.setFragmentNumber(taskNum);
+    builder.setAttemptNumber(0); // TODO: hmm
     builder.setContainerIdString(containerId.toString());
     builder.setAmHost(address.getHostName());
     builder.setAmPort(address.getPort());
@@ -351,7 +336,7 @@ public class LlapBaseInputFormat<V extends WritableComparable> implements InputF
   }
 
   private static class LlapRecordReaderTaskUmbilicalExternalResponder implements LlapTaskUmbilicalExternalResponder {
-    protected LlapBaseRecordReader recordReader = null;
+    protected LlapBaseRecordReader<?> recordReader = null;
     protected LinkedBlockingQueue<ReaderEvent> queuedEvents = new LinkedBlockingQueue<ReaderEvent>();
 
     public LlapRecordReaderTaskUmbilicalExternalResponder() {
@@ -369,6 +354,7 @@ public class LlapBaseInputFormat<V extends WritableComparable> implements InputF
 
     @Override
     public void heartbeat(TezHeartbeatRequest request) {
+      // TODO: why is this ignored?
       TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
       List<TezEvent> inEvents = request.getEvents();
       for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
@@ -415,7 +401,7 @@ public class LlapBaseInputFormat<V extends WritableComparable> implements InputF
       }
     }
 
-    public synchronized LlapBaseRecordReader getRecordReader() {
+    public synchronized LlapBaseRecordReader<?> getRecordReader() {
       return recordReader;
     }
 
@@ -441,7 +427,7 @@ public class LlapBaseInputFormat<V extends WritableComparable> implements InputF
      * @param readerEvent
      */
     protected synchronized void sendOrQueueEvent(ReaderEvent readerEvent) {
-      LlapBaseRecordReader recordReader = getRecordReader();
+      LlapBaseRecordReader<?> recordReader = getRecordReader();
       if (recordReader != null) {
         recordReader.handleEvent(readerEvent);
       } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 6f21d3c..387000d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -113,7 +113,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
     this.localShufflePort = localShufflePort;
     this.amReporter = amReporter;
     this.signer = UserGroupInformation.isSecurityEnabled()
-        ? new LlapSignerImpl(conf, daemonId) : null;
+        ? new LlapSignerImpl(conf, daemonId.getClusterString()) : null;
     this.fsUgiFactory = fsUgiFactory;
 
     this.clusterId = daemonId.getClusterString();

http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java
deleted file mode 100644
index 4174593..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap.security;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.DaemonId;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.delegation.DelegationKey;
-
-import com.google.common.annotations.VisibleForTesting;
-
-public class LlapSignerImpl implements LlapSigner {
-  private final SigningSecretManager secretManager;
-
-  public LlapSignerImpl(Configuration conf, DaemonId daemonId) {
-    // TODO: create this centrally in HS2 case
-    secretManager = SecretManager.createSecretManager(conf, daemonId.getClusterString());
-  }
-
-  @VisibleForTesting
-  public LlapSignerImpl(SigningSecretManager sm) {
-    secretManager = sm;
-  }
-
-  @Override
-  public SignedMessage serializeAndSign(Signable message) throws IOException {
-    SignedMessage result = new SignedMessage();
-    DelegationKey key = secretManager.getCurrentKey();
-    message.setSignInfo(key.getKeyId(), UserGroupInformation.getCurrentUser().getUserName());
-    result.message = message.serialize();
-    result.signature = secretManager.signWithKey(result.message, key);
-    return result;
-  }
-
-  @Override
-  public void checkSignature(byte[] message, byte[] signature, int keyId)
-      throws SecurityException {
-    byte[] expectedSignature = secretManager.signWithKey(message, keyId);
-    if (Arrays.equals(signature, expectedSignature)) return;
-    throw new SecurityException("Message signature does not match");
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-server/src/test/org/apache/hadoop/hive/llap/security/TestLlapSignerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/security/TestLlapSignerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/security/TestLlapSignerImpl.java
index 0420225..a281fd6 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/security/TestLlapSignerImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/security/TestLlapSignerImpl.java
@@ -127,7 +127,7 @@ public class TestLlapSignerImpl {
     }
 
     @Override
-    public void setSignInfo(int masterKeyId, String user) {
+    public void setSignInfo(int masterKeyId) {
       this.masterKeyId = masterKeyId;
     }
 
@@ -196,5 +196,10 @@ public class TestLlapSignerImpl {
     public AbstractDelegationTokenIdentifier createIdentifier() {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public void close() {
+      stopThreads();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 026df3b..80096f5 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -599,7 +599,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
     }
     builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
     builder.setWorkSpec(VertexOrBinary.newBuilder().setVertex(Converters.convertTaskSpecToProto(
-        taskSpec, appAttemptId, getTokenIdentifier(), null, user)).build());
+        taskSpec, appAttemptId, getTokenIdentifier(), user)).build());
     // Don't call builder.setWorkSpecSignature() - Tez doesn't sign fragments
     builder.setFragmentRuntimeInfo(fragmentRuntimeInfo);
     return builder.build();

http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/ql/pom.xml
----------------------------------------------------------------------
diff --git a/ql/pom.xml b/ql/pom.xml
index db03978..02ddb80 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -812,6 +812,7 @@
                   <include>org.apache.hive:hive-common</include>
                   <include>org.apache.hive:hive-exec</include>
                   <include>org.apache.hive:hive-serde</include>
+                  <include>org.apache.hive:hive-llap-common</include>
                   <include>org.apache.hive:hive-llap-client</include>
                   <include>org.apache.hive:hive-metastore</include>
                   <include>org.apache.hive:hive-service-rpc</include>

http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index d04cfa3..919b35a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -51,12 +51,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.DaemonId;
-import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator;
 import org.apache.hadoop.hive.llap.impl.LlapProtocolClientImpl;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.llap.security.LlapTokenClient;
 import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
-import org.apache.hadoop.hive.llap.security.LlapTokenLocalClient;
 import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
 import org.apache.hadoop.hive.llap.tezplugins.LlapContainerLauncher;
 import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator;
@@ -85,11 +84,6 @@ import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-
 /**
  * Holds session state related to Tez
  */
@@ -338,19 +332,6 @@ public class TezSessionState {
     }
   }
 
-  // Only cache ZK connections (ie local clients); these are presumed to be used in HS2.
-  // TODO: temporary before HIVE-13698.
-  private static final Cache<String, LlapTokenLocalClient> localClientCache = CacheBuilder
-      .newBuilder().expireAfterAccess(10, TimeUnit.MINUTES)
-      .removalListener(new RemovalListener<String, LlapTokenLocalClient>() {
-        @Override
-        public void onRemoval(RemovalNotification<String, LlapTokenLocalClient> notification) {
-          if (notification.getValue() != null) {
-            notification.getValue().close();
-          }
-        }
-      }).build();
-
   private static Token<LlapTokenIdentifier> getLlapToken(
       String user, final Configuration conf) throws IOException {
     // TODO: parts of this should be moved out of TezSession to reuse the clients, but there's
@@ -359,21 +340,17 @@ public class TezSessionState {
     boolean isInHs2 = session != null && session.isHiveServerQuery();
     Token<LlapTokenIdentifier> token = null;
     // For Tez, we don't use appId to distinguish the tokens.
+
+    LlapCoordinator coordinator = null;
     if (isInHs2) {
       // We are in HS2, get the token locally.
-      String clusterName = LlapUtil.generateClusterName(conf);
-      // This assumes that the LLAP cluster and session are both running under HS2 user.
-      final String clusterId = DaemonId.createClusterString(user, clusterName);
-      try {
-        token = localClientCache.get(clusterId, new Callable<LlapTokenLocalClient>() {
-          @Override
-          public LlapTokenLocalClient call() throws Exception {
-            return new LlapTokenLocalClient(conf, clusterId);
-          }
-        }).createToken(null, null, false); // Signature is not required for Tez.
-      } catch (ExecutionException e) {
-        throw new IOException(e);
+      // TODO: coordinator should be passed in; HIVE-13698. Must be initialized for now.
+      coordinator = LlapCoordinator.getInstance();
+      if (coordinator == null) {
+        throw new IOException("LLAP coordinator not initialized; cannot get LLAP tokens");
       }
+      // Signing is not required for Tez.
+      token = coordinator.getLocalTokenClient(conf, user).createToken(null, null, false);
     } else {
       // We are not in HS2; always create a new client for now.
       token = new LlapTokenClient(conf).getDelegationToken(null);

http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index 140dbda..7be6f7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -28,7 +28,6 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 
@@ -49,6 +48,12 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.llap.Schema;
 import org.apache.hadoop.hive.llap.FieldDesc;
 import org.apache.hadoop.hive.llap.TypeDesc;
+import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
+import org.apache.hadoop.hive.llap.security.LlapSigner;
+import org.apache.hadoop.hive.llap.security.LlapSigner.Signable;
+import org.apache.hadoop.hive.llap.security.LlapSigner.SignedMessage;
+import org.apache.hadoop.hive.llap.tez.Converters;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.Driver;
@@ -61,7 +66,6 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
 import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
 import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
-import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.TezWork;
@@ -91,7 +95,6 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.TaskSpecBuilder;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.mapreduce.grouper.TezSplitGrouper;
 import org.apache.tez.runtime.api.Event;
@@ -112,11 +115,8 @@ import com.google.common.base.Preconditions;
     + "Returns an array of length int serialized splits for the referenced tables string.")
 @UDFType(deterministic = false)
 public class GenericUDTFGetSplits extends GenericUDTF {
-
   private static final Logger LOG = LoggerFactory.getLogger(GenericUDTFGetSplits.class);
 
-  private static final String LLAP_INTERNAL_INPUT_FORMAT_NAME = "org.apache.hadoop.hive.llap.LlapInputFormat";
-
   protected transient StringObjectInspector stringOI;
   protected transient IntObjectInspector intOI;
   protected transient JobConf jc;
@@ -307,10 +307,10 @@ public class GenericUDTFGetSplits extends GenericUDTF {
       Preconditions.checkState(HiveConf.getBoolVar(wxConf,
               HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS));
       HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, mapWork);
+      // TODO: these need to be signed, or at least the splits inside.
       List<Event> eventList = splitGenerator.initialize();
 
       InputSplit[] result = new InputSplit[eventList.size() - 1];
-      DataOutputBuffer dob = new DataOutputBuffer();
 
       InputConfigureVertexTasksEvent configureEvent
         = (InputConfigureVertexTasksEvent) eventList.get(0);
@@ -324,19 +324,35 @@ public class GenericUDTFGetSplits extends GenericUDTF {
         LOG.debug("NumSplits=" + result.length);
       }
 
-      ApplicationId fakeApplicationId
-        = ApplicationId.newInstance(Math.abs(new Random().nextInt()), 0);
+      LlapCoordinator coordinator = LlapCoordinator.getInstance();
+      if (coordinator == null) {
+        throw new IOException("LLAP coordinator is not initialized; must be running in HS2 with "
+            + ConfVars.LLAP_HS2_ENABLE_COORDINATOR.varname + " enabled");
+      }
+
+      // See the discussion in the implementation as to why we generate app ID.
+      ApplicationId fakeApplicationId = coordinator.createExtClientAppId();
 
+      // This assumes LLAP cluster owner is always the HS2 user.
       String llapUser = UserGroupInformation.getLoginUser().getShortUserName();
+      LlapSigner signer = UserGroupInformation.isSecurityEnabled()
+          ? coordinator.getLlapSigner(job) : null;
+
       LOG.info("Number of splits: " + (eventList.size() - 1));
+      SignedMessage signedSvs = null;
+      DataOutputBuffer dob = new DataOutputBuffer();
       for (int i = 0; i < eventList.size() - 1; i++) {
-
-        TaskSpec taskSpec =
-          new TaskSpecBuilder().constructTaskSpec(dag, vertexName,
+        TaskSpec taskSpec = new TaskSpecBuilder().constructTaskSpec(dag, vertexName,
               eventList.size() - 1, fakeApplicationId, i);
 
-        SubmitWorkInfo submitWorkInfo =
-          new SubmitWorkInfo(taskSpec, fakeApplicationId, System.currentTimeMillis());
+        if (i == 0) {
+          // Despite the differences in TaskSpec, the vertex spec should be the same.
+          signedSvs = createSignedVertexSpec(signer, taskSpec, fakeApplicationId);
+        }
+
+        SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(fakeApplicationId,
+            System.currentTimeMillis(), taskSpec.getVertexParallelism(), signedSvs.message,
+            signedSvs.signature);
         EventMetaData sourceMetaData =
           new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, vertexName,
               "NULL_VERTEX", null);
@@ -372,6 +388,40 @@ public class GenericUDTFGetSplits extends GenericUDTF {
     }
   }
 
+  private SignedMessage createSignedVertexSpec(LlapSigner signer, TaskSpec taskSpec,
+      ApplicationId fakeApplicationId) throws IOException {
+    // We put the query user, not LLAP user, in the message (and the token later?)
+    String user = SessionState.getUserFromAuthenticator();
+    if (user == null) {
+      user = UserGroupInformation.getCurrentUser().getUserName();
+      LOG.warn("Cannot determine the session user; using " + user + " instead");
+    }
+    final SignableVertexSpec.Builder svsb = Converters.convertTaskSpecToProto(
+        taskSpec, 0, fakeApplicationId.toString(), user);
+    if (signer == null) {
+      SignedMessage result = new SignedMessage();
+      result.message = serializeVertexSpec(svsb);
+      return result;
+    }
+    return signer.serializeAndSign(new Signable() {
+      @Override
+      public void setSignInfo(int masterKeyId) {
+        svsb.setSignatureKeyId(masterKeyId);
+      }
+
+      @Override
+      public byte[] serialize() throws IOException {
+        return serializeVertexSpec(svsb);
+      }
+    });
+  }
+
+  private static byte[] serializeVertexSpec(SignableVertexSpec.Builder svsb) throws IOException {
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    svsb.build().writeTo(os);
+    return os.toByteArray();
+  }
+
   /**
    * Returns a local resource representing a jar. This resource will be used to execute the plan on
    * the cluster.

http://git-wip-us.apache.org/repos/asf/hive/blob/6a59cfd5/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index d61edf5..14e1e0d 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -50,6 +50,8 @@ import org.apache.hadoop.hive.common.cli.CommonCliOptions;
 import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
 import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
 import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
@@ -133,6 +135,14 @@ public class HiveServer2 extends CompositeService {
     } catch (Throwable t) {
       throw new Error("Unable to intitialize HiveServer2", t);
     }
+    if (HiveConf.getBoolVar(hiveConf, ConfVars.LLAP_HS2_ENABLE_COORDINATOR)) {
+      // See method comment.
+      try {
+        LlapCoordinator.initializeInstance(hiveConf);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
     // Setup web UI
     try {
       if (hiveConf.getBoolVar(ConfVars.HIVE_IN_TEST)) {


Mime
View raw message