hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject [1/2] hive git commit: HIVE-12341 : LLAP: add security to daemon protocol endpoint (excluding shuffle) (Sergey Shelukhin, reviewed by Siddharth Seth, Lefty Leverenz)
Date Thu, 10 Dec 2015 00:05:36 GMT
Repository: hive
Updated Branches:
  refs/heads/branch-2.0 30e25028b -> 2d2562c4c


http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
index 784c631..db0b752 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
@@ -16,17 +16,26 @@ package org.apache.hadoop.hive.llap.daemon.impl;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
 import com.google.protobuf.BlockingService;
+import com.google.protobuf.ByteString;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
@@ -35,40 +44,52 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.hive.llap.security.LlapSecurityHelper;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.hive.llap.security.SecretManager;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.security.LlapDaemonPolicyProvider;
 
 public class LlapDaemonProtocolServerImpl extends AbstractService
-    implements LlapDaemonProtocolBlockingPB {
+    implements LlapDaemonProtocolBlockingPB, LlapManagementProtocolBlockingPB {
 
   private static final Logger LOG = LoggerFactory.getLogger(LlapDaemonProtocolServerImpl.class);
 
   private final int numHandlers;
   private final ContainerRunner containerRunner;
-  private final int configuredPort;
-  private RPC.Server server;
-  private final AtomicReference<InetSocketAddress> bindAddress;
-
+  private final int srvPort, mngPort;
+  private RPC.Server server, mngServer;
+  private final AtomicReference<InetSocketAddress> srvAddress, mngAddress;
+  private SecretManager zkSecretManager;
 
   public LlapDaemonProtocolServerImpl(int numHandlers,
                                       ContainerRunner containerRunner,
-                                      AtomicReference<InetSocketAddress> address,
-                                      int configuredPort) {
+                                      AtomicReference<InetSocketAddress> srvAddress,
+                                      AtomicReference<InetSocketAddress> mngAddress,
+                                      int srvPort,
+                                      int mngPort) {
     super("LlapDaemonProtocolServerImpl");
     this.numHandlers = numHandlers;
     this.containerRunner = containerRunner;
-    this.bindAddress = address;
-    this.configuredPort = configuredPort;
+    this.srvAddress = srvAddress;
+    this.srvPort = srvPort;
+    this.mngAddress = mngAddress;
+    this.mngPort = mngPort;
     LOG.info("Creating: " + LlapDaemonProtocolServerImpl.class.getSimpleName() +
-        " with port configured to: " + configuredPort);
+        " with port configured to: " + srvPort);
   }
 
   @Override
-  public SubmitWorkResponseProto submitWork (RpcController controller,
+  public SubmitWorkResponseProto submitWork(RpcController controller,
                                             SubmitWorkRequestProto request) throws
       ServiceException {
     try {
@@ -81,16 +102,14 @@ public class LlapDaemonProtocolServerImpl extends AbstractService
 
   @Override
   public SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller,
-                                                            SourceStateUpdatedRequestProto request) throws
-      ServiceException {
+      SourceStateUpdatedRequestProto request) throws ServiceException {
     containerRunner.sourceStateUpdated(request);
     return SourceStateUpdatedResponseProto.getDefaultInstance();
   }
 
   @Override
   public QueryCompleteResponseProto queryComplete(RpcController controller,
-                                                  QueryCompleteRequestProto request) throws
-      ServiceException {
+      QueryCompleteRequestProto request) throws ServiceException {
     containerRunner.queryComplete(request);
     return QueryCompleteResponseProto.getDefaultInstance();
   }
@@ -105,24 +124,62 @@ public class LlapDaemonProtocolServerImpl extends AbstractService
 
   @Override
   public void serviceStart() {
-    Configuration conf = getConfig();
+    final Configuration conf = getConfig();
+    final BlockingService daemonImpl =
+        LlapDaemonProtocolProtos.LlapDaemonProtocol.newReflectiveBlockingService(this);
+    final BlockingService managementImpl =
+        LlapDaemonProtocolProtos.LlapManagementProtocol.newReflectiveBlockingService(this);
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      startProtocolServers(conf, daemonImpl, managementImpl);
+      return;
+    }
+    String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
+        llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
+    zkSecretManager = SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab);
 
-    InetSocketAddress addr = new InetSocketAddress(configuredPort);
+    // Start the protocol server after properly authenticating with daemon keytab.
+    UserGroupInformation daemonUgi = null;
     try {
-      server = createServer(LlapDaemonProtocolBlockingPB.class, addr, conf, numHandlers,
-          LlapDaemonProtocolProtos.LlapDaemonProtocol.newReflectiveBlockingService(this));
+      daemonUgi = LlapSecurityHelper.loginWithKerberos(llapPrincipal, llapKeytab);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    daemonUgi.doAs(new PrivilegedAction<Void>() {
+       @Override
+      public Void run() {
+         startProtocolServers(conf, daemonImpl, managementImpl);
+         return null;
+      }
+    });
+  }
+
+  private void startProtocolServers(
+      Configuration conf, BlockingService daemonImpl, BlockingService managementImpl) {
+    server = startProtocolServer(srvPort, numHandlers, srvAddress, conf, daemonImpl,
+        LlapDaemonProtocolBlockingPB.class);
+    mngServer = startProtocolServer(mngPort, 2, mngAddress, conf, managementImpl,
+        LlapManagementProtocolBlockingPB.class);
+  }
+
+  private RPC.Server startProtocolServer(int srvPort, int numHandlers,
+      AtomicReference<InetSocketAddress> bindAddress, Configuration conf,
+      BlockingService impl, Class<?> protocolClass) {
+    InetSocketAddress addr = new InetSocketAddress(srvPort);
+    RPC.Server server;
+    try {
+      server = createServer(protocolClass, addr, conf, numHandlers, impl);
       server.start();
     } catch (IOException e) {
-      LOG.error("Failed to run RPC Server on port: " + configuredPort, e);
+      LOG.error("Failed to run RPC Server on port: " + srvPort, e);
       throw new RuntimeException(e);
     }
 
     InetSocketAddress serverBindAddress = NetUtils.getConnectAddress(server);
-    this.bindAddress.set(NetUtils.createSocketAddrForHost(
+    bindAddress.set(NetUtils.createSocketAddrForHost(
         serverBindAddress.getAddress().getCanonicalHostName(),
         serverBindAddress.getPort()));
-    LOG.info("Instantiated " + LlapDaemonProtocolBlockingPB.class.getSimpleName() + " at " +
-        bindAddress);
+    LOG.info("Instantiated " + protocolClass.getSimpleName() + " at " + bindAddress);
+    return server;
   }
 
   @Override
@@ -130,26 +187,68 @@ public class LlapDaemonProtocolServerImpl extends AbstractService
     if (server != null) {
       server.stop();
     }
+    if (mngServer != null) {
+      mngServer.stop();
+    }
   }
 
   @InterfaceAudience.Private
   @VisibleForTesting
   InetSocketAddress getBindAddress() {
-    return bindAddress.get();
+    return srvAddress.get();
   }
 
   private RPC.Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf,
                                   int numHandlers, BlockingService blockingService) throws
       IOException {
     RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
-    RPC.Server server = new RPC.Builder(conf)
+    RPC.Builder builder = new RPC.Builder(conf)
         .setProtocol(pbProtocol)
         .setInstance(blockingService)
         .setBindAddress(addr.getHostName())
         .setPort(addr.getPort())
-        .setNumHandlers(numHandlers)
-        .build();
-    // TODO Add security.
+        .setNumHandlers(numHandlers);
+    if (zkSecretManager != null) {
+      builder = builder.setSecretManager(zkSecretManager);
+    }
+    RPC.Server server = builder.build();
+
+    if (conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
+      server.refreshServiceAcl(conf, new LlapDaemonPolicyProvider());
+    }
     return server;
   }
+
+  @Override
+  public GetTokenResponseProto getDelegationToken(RpcController controller,
+      GetTokenRequestProto request) throws ServiceException {
+    if (zkSecretManager == null) {
+      throw new ServiceException("Operation not supported on unsecure cluster");
+    }
+    UserGroupInformation ugi;
+    try {
+      ugi = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    String user = ugi.getUserName();
+    Text owner = new Text(user);
+    Text realUser = null;
+    if (ugi.getRealUser() != null) {
+      realUser = new Text(ugi.getRealUser().getUserName());
+    }
+    Text renewer = new Text(ugi.getShortUserName());
+    LlapTokenIdentifier llapId = new LlapTokenIdentifier(owner, renewer, realUser);
+    // TODO: note that the token is not renewable right now and will last for 2 weeks by default.
+    Token<LlapTokenIdentifier> token = new Token<LlapTokenIdentifier>(llapId, zkSecretManager);
+    ByteArrayDataOutput out = ByteStreams.newDataOutput();
+    try {
+      token.write(out);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    ByteString bs = ByteString.copyFrom(out.toByteArray());
+    GetTokenResponseProto response = GetTokenResponseProto.newBuilder().setToken(bs).build();
+    return response;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java
new file mode 100644
index 0000000..e293a95
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import javax.annotation.Nullable;
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.ProtocolProxy;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class LlapManagementProtocolClientImpl implements LlapManagementProtocolBlockingPB {
+
+  private final Configuration conf;
+  private final InetSocketAddress serverAddr;
+  private final RetryPolicy retryPolicy;
+  private final SocketFactory socketFactory;
+  LlapManagementProtocolBlockingPB proxy;
+
+
+  public LlapManagementProtocolClientImpl(Configuration conf, String hostname, int port,
+                                      @Nullable RetryPolicy retryPolicy,
+                                      @Nullable SocketFactory socketFactory) {
+    this.conf = conf;
+    this.serverAddr = NetUtils.createSocketAddr(hostname, port);
+    this.retryPolicy = retryPolicy;
+    if (socketFactory == null) {
+      this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+    } else {
+      this.socketFactory = socketFactory;
+    }
+  }
+
+  public LlapManagementProtocolBlockingPB getProxy() throws IOException {
+    if (proxy == null) {
+      proxy = createProxy();
+    }
+    return proxy;
+  }
+
+  public LlapManagementProtocolBlockingPB createProxy() throws IOException {
+    RPC.setProtocolEngine(conf, LlapManagementProtocolBlockingPB.class, ProtobufRpcEngine.class);
+    ProtocolProxy<LlapManagementProtocolBlockingPB> proxy =
+        RPC.getProtocolProxy(LlapManagementProtocolBlockingPB.class, 0, serverAddr,
+            UserGroupInformation.getCurrentUser(), conf, NetUtils.getDefaultSocketFactory(conf), 0,
+            retryPolicy);
+    return proxy.getProxy();
+  }
+
+  @Override
+  public GetTokenResponseProto getDelegationToken(RpcController controller,
+      GetTokenRequestProto request) throws ServiceException {
+    try {
+      return getProxy().getDelegationToken(null, request);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java b/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
index fae7654..9549567 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
@@ -18,11 +18,14 @@ import java.io.IOException;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.token.TokenInfo;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
 import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.apache.tez.runtime.common.security.JobTokenSelector;
 
+@TokenInfo(JobTokenSelector.class)
 public interface LlapTaskUmbilicalProtocol extends VersionedProtocol {
 
   public static final long versionID = 1L;

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java
new file mode 100644
index 0000000..d67647b
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.security;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.Service;
+
+public class LlapDaemonPolicyProvider extends PolicyProvider {
+  private static final Service[] services = new Service[] {
+    new Service(HiveConf.ConfVars.LLAP_SECURITY_ACL.varname,
+        LlapDaemonProtocolBlockingPB.class),
+    new Service(HiveConf.ConfVars.LLAP_MANAGEMENT_ACL.varname,
+        LlapManagementProtocolBlockingPB.class)
+  };
+
+  @Override
+  public Service[] getServices() {
+    return services;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
new file mode 100644
index 0000000..a00b631
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapManagementProtocolClientImpl;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+
+/** Individual instances of this class are not thread safe. */
+public class LlapSecurityHelper implements LlapTokenProvider {
+  private static final Logger LOG = LoggerFactory.getLogger(LlapSecurityHelper.class);
+
+  private UserGroupInformation llapUgi;
+
+  private final LlapRegistryService registry;
+  private ServiceInstanceSet activeInstances;
+  private final Configuration conf;
+  private LlapManagementProtocolClientImpl client;
+
+  private final SocketFactory socketFactory;
+  private final RetryPolicy retryPolicy;
+
+  public LlapSecurityHelper(Configuration conf) {
+    this.conf = conf;
+    registry = new LlapRegistryService(false);
+    registry.init(conf);
+    socketFactory = NetUtils.getDefaultSocketFactory(conf);
+    retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
+        16000, 2000l, TimeUnit.MILLISECONDS);
+  }
+
+  public static UserGroupInformation loginWithKerberos(
+      String principal, String keytabFile) throws IOException {
+    if (!UserGroupInformation.isSecurityEnabled()) return null;
+    if (principal.isEmpty() || keytabFile.isEmpty()) {
+      throw new RuntimeException("Kerberos principal and/or keytab are empty");
+    }
+    LOG.info("Logging in as " + principal + " via " + keytabFile);
+    UserGroupInformation.loginUserFromKeytab(
+        SecurityUtil.getServerPrincipal(principal, "0.0.0.0"), keytabFile);
+    return UserGroupInformation.getLoginUser();
+  }
+
+  @Override
+  public Token<LlapTokenIdentifier> getDelegationToken() throws IOException {
+    if (!UserGroupInformation.isSecurityEnabled()) return null;
+    if (llapUgi == null) {
+      llapUgi = UserGroupInformation.getCurrentUser();
+      // We could have also added keytab support; right now client must do smth like kinit.
+    }
+    Iterator<ServiceInstance> llaps = null;
+    ServiceInstance someLlap = null;
+    if (client == null) {
+      llaps = getLlapServices(false);
+      someLlap = llaps.next();
+    }
+
+    ByteString tokenBytes = null;
+    boolean hasRefreshed = false;
+    while (true) {
+      try {
+        tokenBytes = getTokenBytes(someLlap);
+        break;
+      } catch (InterruptedException ie) {
+        throw new RuntimeException(ie);
+      } catch (IOException ex) {
+        LOG.error("Cannot get a token, trying a different instance", ex);
+        client = null;
+      }
+      if (llaps == null || !llaps.hasNext()) {
+        if (hasRefreshed) { // Only refresh once.
+          throw new RuntimeException("Cannot find any LLAPs to get the token from");
+        }
+        llaps = getLlapServices(true);
+        hasRefreshed = true;
+      }
+      someLlap = llaps.next();
+    }
+
+    // Stupid protobuf byte-buffer reinvention.
+    Token<LlapTokenIdentifier> token = new Token<>();
+    DataInputByteBuffer in = new DataInputByteBuffer();
+    in.reset(tokenBytes.asReadOnlyByteBuffer());
+    token.readFields(in);
+    return token;
+  }
+
+  private ByteString getTokenBytes(
+      final ServiceInstance si) throws InterruptedException, IOException {
+    return llapUgi.doAs(new PrivilegedExceptionAction<ByteString>() {
+      @Override
+      public ByteString run() throws Exception {
+        if (client == null) {
+          client = new LlapManagementProtocolClientImpl(
+            conf, si.getHost(), si.getManagementPort(), retryPolicy, socketFactory);
+        }
+        // Client only connects on the first call, so this has to be done in doAs.
+        GetTokenRequestProto req = GetTokenRequestProto.newBuilder().build();
+        return client.getDelegationToken(null, req).getToken();
+      }
+    });
+  }
+
+  private Iterator<ServiceInstance> getLlapServices(boolean doForceRefresh) throws IOException {
+    if (activeInstances == null) {
+      registry.start();
+      activeInstances = registry.getInstances();
+    }
+    Map<String, ServiceInstance> daemons = activeInstances.getAll();
+    if (doForceRefresh || daemons == null || daemons.isEmpty()) {
+      activeInstances.refresh();
+      daemons = activeInstances.getAll();
+      if (daemons == null || daemons.isEmpty()) throw new RuntimeException("No LLAPs found");
+    }
+    return daemons.values().iterator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
new file mode 100644
index 0000000..4dca2ce
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.lang.annotation.Annotation;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.security.token.TokenSelector;
+
+public class LlapServerSecurityInfo extends SecurityInfo {
+  private static final Log LOG = LogFactory.getLog(LlapServerSecurityInfo.class);
+
+  @Override
+  public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Trying to get KerberosInfo for " + protocol);
+    }
+    if (!LlapDaemonProtocolBlockingPB.class.isAssignableFrom(protocol)
+        && !LlapManagementProtocolBlockingPB.class.isAssignableFrom(protocol)) return null;
+    return new KerberosInfo() {
+      @Override
+      public Class<? extends Annotation> annotationType() {
+        return null;
+      }
+
+      @Override
+      public String serverPrincipal() {
+        return HiveConf.ConfVars.LLAP_KERBEROS_PRINCIPAL.varname;
+      }
+
+      @Override
+      public String clientPrincipal() {
+        return null;
+      }
+    };
+  }
+
+  @Override
+  public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Trying to get TokenInfo for " + protocol);
+    }
+    // Tokens cannot be used for the management protocol (for now).
+    if (!LlapDaemonProtocolBlockingPB.class.isAssignableFrom(protocol)) return null;
+    return new TokenInfo() {
+      @Override
+      public Class<? extends Annotation> annotationType() {
+        return null;
+      }
+
+      @Override
+      public Class<? extends TokenSelector<? extends TokenIdentifier>> value() {
+        return LlapTokenSelector.class;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java
new file mode 100644
index 0000000..b6e7499
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+
+public class LlapTokenSelector implements TokenSelector<LlapTokenIdentifier> {
+  private static final Log LOG = LogFactory.getLog(LlapTokenSelector.class);
+
+  @Override
+  public Token<LlapTokenIdentifier> selectToken(Text service,
+      Collection<Token<? extends TokenIdentifier>> tokens) {
+    if (service == null) return null;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Looking for a token with service " + service);
+    }
+    for (Token<? extends TokenIdentifier> token : tokens) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Token = " + token.getKind() + "; service = " + token.getService());
+      }
+      if (LlapTokenIdentifier.KIND_NAME.equals(token.getKind())
+          && service.equals(token.getService())) {
+        @SuppressWarnings("unchecked")
+        Token<LlapTokenIdentifier> result = (Token<LlapTokenIdentifier>)token;
+        return result;
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
new file mode 100644
index 0000000..dc4e81a
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+
+public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdentifier> {
+  public SecretManager(Configuration conf) {
+    super(conf);
+  }
+
+  @Override
+  public LlapTokenIdentifier createIdentifier() {
+    return new LlapTokenIdentifier();
+  }
+
+  @Override
+  public LlapTokenIdentifier decodeTokenIdentifier(
+      Token<LlapTokenIdentifier> token) throws IOException {
+    DataInputStream dis = new DataInputStream(new ByteArrayInputStream(token.getIdentifier()));
+    LlapTokenIdentifier id = new LlapTokenIdentifier();
+    id.readFields(dis);
+    dis.close();
+    return id;
+  }
+
+  public static SecretManager createSecretManager(
+      final Configuration conf, String llapPrincipal, String llapKeytab) {
+    // Create ZK connection under a separate ugi (if specified) - ZK works in mysterious ways.
+    UserGroupInformation zkUgi = null;
+    String principal = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_PRINCIPAL, llapPrincipal);
+    String keyTab = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_KEYTAB_FILE, llapKeytab);
+    try {
+      zkUgi = LlapSecurityHelper.loginWithKerberos(principal, keyTab);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    // Override the default delegation token lifetime for LLAP.
+    // Also set all the necessary ZK settings to defaults and LLAP configs, if not set.
+    final Configuration zkConf = new Configuration(conf);
+    zkConf.setLong(DelegationTokenManager.MAX_LIFETIME,
+        HiveConf.getTimeVar(conf, ConfVars.LLAP_DELEGATION_TOKEN_LIFETIME, TimeUnit.SECONDS));
+    zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_PRINCIPAL, principal);
+    zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_KEYTAB, keyTab);
+    setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "llapzkdtsm");
+    setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_AUTH_TYPE, "sasl");
+    setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_CONNECTION_STRING,
+        HiveConf.getVar(zkConf, ConfVars.LLAP_ZKSM_ZK_CONNECTION_STRING));
+    return zkUgi.doAs(new PrivilegedAction<SecretManager>() {
+      @Override
+      public SecretManager run() {
+        SecretManager zkSecretManager = new SecretManager(zkConf);
+        try {
+          zkSecretManager.startThreads();
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        return zkSecretManager;
+      }
+    });
+  }
+
+  private static void setZkConfIfNotSet(Configuration zkConf, String name, String value) {
+    if (zkConf.get(name) != null) return;
+    zkConf.set(name, value);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index b93650d..ce248e9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -25,11 +25,13 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.LlapNodeId;
@@ -44,6 +46,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
 import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.hive.llap.tezplugins.helpers.SourceStateTracker;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
@@ -52,6 +55,8 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -88,12 +93,23 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
   private TaskCommunicator communicator;
   private long deleteDelayOnDagComplete;
   private final LlapTaskUmbilicalProtocol umbilical;
+  private final Token<LlapTokenIdentifier> token;
 
   private volatile String currentDagName;
 
   public LlapTaskCommunicator(
       TaskCommunicatorContext taskCommunicatorContext) {
     super(taskCommunicatorContext);
+    Credentials credentials = taskCommunicatorContext.getCredentials();
+    if (credentials != null) {
+      @SuppressWarnings("unchecked")
+      Token<LlapTokenIdentifier> llapToken =
+          (Token<LlapTokenIdentifier>)credentials.getToken(LlapTokenIdentifier.KIND_NAME);
+      this.token = llapToken;
+    } else {
+      this.token = null;
+    }
+    Preconditions.checkState((token != null) == UserGroupInformation.isSecurityEnabled());
 
     umbilical = new LlapTaskUmbilicalProtocolImpl(getUmbilical());
     SubmitWorkRequestProto.Builder baseBuilder = SubmitWorkRequestProto.newBuilder();
@@ -117,7 +133,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
     super.initialize();
     Configuration conf = getConf();
     int numThreads = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS);
-    this.communicator = new TaskCommunicator(numThreads, conf);
+    this.communicator = new TaskCommunicator(numThreads, conf, token);
     this.deleteDelayOnDagComplete = HiveConf.getTimeVar(
         conf, ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS);
     LOG.info("Running LlapTaskCommunicator with "
@@ -158,7 +174,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
           .setNumHandlers(numHandlers)
           .setSecretManager(jobTokenSecretManager).build();
 
-      // Do serviceACLs need to be refreshed, like in Tez ?
+      if (conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
+        server.refreshServiceAcl(conf, new LlapUmbilicalPolicyProvider());
+      }
 
       server.start();
       this.address = NetUtils.getConnectAddress(server);

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java
new file mode 100644
index 0000000..4102f5b
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
+import org.apache.hadoop.security.authorize.Service;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.app.security.authorize.TezAMPolicyProvider;
+
+public class LlapUmbilicalPolicyProvider extends TezAMPolicyProvider {
+
+  private static Service[] services;
+  private static final Object servicesLock = new Object();
+
+  @Override
+  public Service[] getServices() {
+    if (services != null) return services;
+    synchronized (servicesLock) {
+      if (services != null) return services;
+      Service[] parentSvc = super.getServices();
+      Service[] result = Arrays.copyOf(parentSvc, parentSvc.length + 1);
+      result[parentSvc.length] =  new Service(
+          TezConstants.TEZ_AM_SECURITY_SERVICE_AUTHORIZATION_TASK_UMBILICAL,
+          LlapTaskUmbilicalProtocol.class);
+      return (services = result);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
index 8144165..f9ca677 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
@@ -15,6 +15,11 @@
 package org.apache.hadoop.hive.llap.tezplugins;
 
 import javax.net.SocketFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.security.PrivilegedAction;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -55,9 +60,13 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.AbstractService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,12 +83,14 @@ public class TaskCommunicator extends AbstractService {
 
   private final ListeningExecutorService requestManagerExecutor;
   private volatile ListenableFuture<Void> requestManagerFuture;
+  private final Token<LlapTokenIdentifier> llapToken;
 
-
-  public TaskCommunicator(int numThreads, Configuration conf) {
+  public TaskCommunicator(
+      int numThreads, Configuration conf, Token<LlapTokenIdentifier> llapToken) {
     super(TaskCommunicator.class.getSimpleName());
     this.hostProxies = new ConcurrentHashMap<>();
     this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+    this.llapToken = llapToken;
 
     long connectionTimeout = HiveConf.getTimeVar(conf,
         ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
@@ -458,13 +469,34 @@ public class TaskCommunicator extends AbstractService {
     void indicateError(Throwable t);
   }
 
-  private LlapDaemonProtocolBlockingPB getProxy(LlapNodeId nodeId) {
+  private LlapDaemonProtocolBlockingPB getProxy(final LlapNodeId nodeId) {
     String hostId = getHostIdentifier(nodeId.getHostname(), nodeId.getPort());
 
     LlapDaemonProtocolBlockingPB proxy = hostProxies.get(hostId);
     if (proxy == null) {
-      proxy = new LlapDaemonProtocolClientImpl(getConfig(), nodeId.getHostname(), nodeId.getPort(),
-          retryPolicy, socketFactory);
+      if (llapToken == null) {
+        proxy = new LlapDaemonProtocolClientImpl(getConfig(), nodeId.getHostname(),
+            nodeId.getPort(), retryPolicy, socketFactory);
+      } else {
+        UserGroupInformation ugi;
+        try {
+          ugi = UserGroupInformation.getCurrentUser();
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        Token<LlapTokenIdentifier> nodeToken = new Token<LlapTokenIdentifier>(llapToken);
+        SecurityUtil.setTokenService(nodeToken, NetUtils.createSocketAddrForHost(
+            nodeId.getHostname(), nodeId.getPort()));
+        ugi.addToken(nodeToken);
+        proxy = ugi.doAs(new PrivilegedAction<LlapDaemonProtocolBlockingPB>() {
+          @Override
+          public LlapDaemonProtocolBlockingPB run() {
+           return new LlapDaemonProtocolClientImpl(getConfig(), nodeId.getHostname(),
+               nodeId.getPort(), retryPolicy, socketFactory);
+          }
+        });
+      }
+
       LlapDaemonProtocolBlockingPB proxyOld = hostProxies.putIfAbsent(hostId, proxy);
       if (proxyOld != null) {
         // TODO Shutdown the new proxy.

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo b/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
new file mode 100644
index 0000000..dcc6988
--- /dev/null
+++ b/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
@@ -0,0 +1,14 @@
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+org.apache.hadoop.hive.llap.security.LlapServerSecurityInfo

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer b/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
new file mode 100644
index 0000000..e80ac41
--- /dev/null
+++ b/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
@@ -0,0 +1,14 @@
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+org.apache.hadoop.hive.llap.security.LlapTokenIdentifier.Renewer

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/llap-server/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-server/src/protobuf/LlapDaemonProtocol.proto b/llap-server/src/protobuf/LlapDaemonProtocol.proto
index 0ba6acf..07721df 100644
--- a/llap-server/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-server/src/protobuf/LlapDaemonProtocol.proto
@@ -117,9 +117,20 @@ message TerminateFragmentRequestProto {
 message TerminateFragmentResponseProto {
 }
 
+message GetTokenRequestProto {
+}
+
+message GetTokenResponseProto {
+  optional bytes token = 1;
+}
+
 service LlapDaemonProtocol {
   rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto);
   rpc sourceStateUpdated(SourceStateUpdatedRequestProto) returns (SourceStateUpdatedResponseProto);
   rpc queryComplete(QueryCompleteRequestProto) returns (QueryCompleteResponseProto);
   rpc terminateFragment(TerminateFragmentRequestProto) returns (TerminateFragmentResponseProto);
 }
+
+service LlapManagementProtocol {
+  rpc getDelegationToken(GetTokenRequestProto) returns (GetTokenResponseProto);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
index 52ba360..deade5f 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
@@ -143,7 +143,7 @@ public class MiniLlapCluster extends AbstractService {
   @Override
   public void serviceInit(Configuration conf) {
     llapDaemon = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled,
-        ioIsDirect, ioBytesPerService, localDirs, 0, 0);
+        ioIsDirect, ioBytesPerService, localDirs, 0, 0, 0);
     llapDaemon.init(conf);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
index bf8a673..0006a9a 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
@@ -39,7 +39,8 @@ public class TestLlapDaemonProtocolServerImpl {
     int numHandlers = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_NUM_HANDLERS);
     LlapDaemonProtocolServerImpl server =
         new LlapDaemonProtocolServerImpl(numHandlers, mock(ContainerRunner.class),
-            new AtomicReference<InetSocketAddress>(), rpcPort);
+           new AtomicReference<InetSocketAddress>(), new AtomicReference<InetSocketAddress>(),
+           rpcPort, rpcPort + 1);
 
     try {
       server.init(new Configuration());

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
----------------------------------------------------------------------
diff --git a/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java b/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
index f4cc240..5202476 100644
--- a/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
+++ b/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
@@ -17765,6 +17765,7 @@ public final class OrcProto {
      * Version of the writer:
      *   0 (or missing) = original
      *   1 = HIVE-8732 fixed
+     *   2 = HIVE-4243 fixed
      * </pre>
      */
     boolean hasWriterVersion();
@@ -17775,6 +17776,7 @@ public final class OrcProto {
      * Version of the writer:
      *   0 (or missing) = original
      *   1 = HIVE-8732 fixed
+     *   2 = HIVE-4243 fixed
      * </pre>
      */
     int getWriterVersion();
@@ -18077,6 +18079,7 @@ public final class OrcProto {
      * Version of the writer:
      *   0 (or missing) = original
      *   1 = HIVE-8732 fixed
+     *   2 = HIVE-4243 fixed
      * </pre>
      */
     public boolean hasWriterVersion() {
@@ -18089,6 +18092,7 @@ public final class OrcProto {
      * Version of the writer:
      *   0 (or missing) = original
      *   1 = HIVE-8732 fixed
+     *   2 = HIVE-4243 fixed
      * </pre>
      */
     public int getWriterVersion() {
@@ -18759,6 +18763,7 @@ public final class OrcProto {
        * Version of the writer:
        *   0 (or missing) = original
        *   1 = HIVE-8732 fixed
+       *   2 = HIVE-4243 fixed
        * </pre>
        */
       public boolean hasWriterVersion() {
@@ -18771,6 +18776,7 @@ public final class OrcProto {
        * Version of the writer:
        *   0 (or missing) = original
        *   1 = HIVE-8732 fixed
+       *   2 = HIVE-4243 fixed
        * </pre>
        */
       public int getWriterVersion() {
@@ -18783,6 +18789,7 @@ public final class OrcProto {
        * Version of the writer:
        *   0 (or missing) = original
        *   1 = HIVE-8732 fixed
+       *   2 = HIVE-4243 fixed
        * </pre>
        */
       public Builder setWriterVersion(int value) {
@@ -18798,6 +18805,7 @@ public final class OrcProto {
        * Version of the writer:
        *   0 (or missing) = original
        *   1 = HIVE-8732 fixed
+       *   2 = HIVE-4243 fixed
        * </pre>
        */
       public Builder clearWriterVersion() {

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 1264421..ea12fe1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -2992,7 +2992,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
           cols.addAll(tbl.getPartCols());
         }
       } else {
-        cols = Hive.getFieldsFromDeserializer(colPath, deserializer); // TODO#: here - desc
+        cols = Hive.getFieldsFromDeserializer(colPath, deserializer);
         if (descTbl.isFormatted()) {
           // when column name is specified in describe table DDL, colPath will
           // will be table_name.column_name

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/ql/src/java/org/apache/hadoop/hive/ql/exec/GlobalWorkMapFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GlobalWorkMapFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GlobalWorkMapFactory.java
index 7c38dc3..338e495 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GlobalWorkMapFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GlobalWorkMapFactory.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 
 public class GlobalWorkMapFactory {
 
@@ -99,7 +99,7 @@ public class GlobalWorkMapFactory {
   DummyMap<Path, BaseWork> dummy = new DummyMap<Path, BaseWork>();
 
   public Map<Path, BaseWork> get(Configuration conf) {
-    if (LlapIoProxy.isDaemon()
+    if (LlapProxy.isDaemon()
         || (SessionState.get() != null && SessionState.get().isHiveServerQuery())
         || HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
       if (threadLocalWorkMap == null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
index 3d9771a..5201120 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
@@ -24,8 +24,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
-import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.ql.exec.tez.LlapObjectCache;
 
 /**
@@ -46,7 +46,7 @@ public class ObjectCacheFactory {
    */
   public static ObjectCache getCache(Configuration conf, String queryId) {
     if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
-      if (LlapIoProxy.isDaemon()) { // daemon
+      if (LlapProxy.isDaemon()) { // daemon
         if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OBJECT_CACHE_ENABLED)) {
           // LLAP object cache, unlike others, does not use globals. Thus, get the existing one.
           return getLlapObjectCache(queryId);

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 914b4e7..ee62ab3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -34,7 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
@@ -96,7 +96,7 @@ public class MapRecordProcessor extends RecordProcessor {
   public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception {
     super(jconf, context);
     String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
-    if (LlapIoProxy.isDaemon()) { // do not cache plan
+    if (LlapProxy.isDaemon()) { // do not cache plan
       cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
     } else {
       cache = ObjectCacheFactory.getCache(jconf, queryId);

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index efcf88c..0579dbc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -29,7 +29,7 @@ import java.util.concurrent.Callable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
@@ -92,7 +92,7 @@ public class ReduceRecordProcessor  extends RecordProcessor{
     ObjectCache cache;
 
     String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
-    if (LlapIoProxy.isDaemon()) { // don't cache plan
+    if (LlapProxy.isDaemon()) { // don't cache plan
       cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
     } else {
       cache = ObjectCacheFactory.getCache(jconf, queryId);

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 07f26be..e1a8041 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -40,6 +42,7 @@ import java.util.concurrent.TimeoutException;
 
 import javax.security.auth.login.LoginException;
 
+import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.hadoop.fs.FileStatus;
@@ -48,11 +51,16 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.hive.llap.security.LlapTokenProvider;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.tez.client.TezClient;
@@ -189,7 +197,8 @@ public class TezSessionState {
     this.queueName = conf.get("tez.queue.name");
     this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
 
-    final boolean llapMode = "llap".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
+    final boolean llapMode = "llap".equals(HiveConf.getVar(
+        conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
 
     UserGroupInformation ugi = Utils.getUGI();
     user = ugi.getShortUserName();
@@ -258,19 +267,25 @@ public class TezSessionState {
     conf.stripHiddenConfigurations(tezConfig);
 
     ServicePluginsDescriptor servicePluginsDescriptor;
-    UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig);
 
+    Credentials llapCredentials = null;
     if (llapMode) {
+      if (UserGroupInformation.isSecurityEnabled()) {
+        LlapTokenProvider tp = LlapProxy.getOrInitTokenProvider(conf);
+        Token<LlapTokenIdentifier> token = tp.getDelegationToken();
+        LOG.info("Obtained a token: " + token);
+        llapCredentials = new Credentials();
+        llapCredentials.addToken(LlapTokenIdentifier.KIND_NAME, token);
+      }
+      UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig);
       // we need plugins to handle llap and uber mode
       servicePluginsDescriptor = ServicePluginsDescriptor.create(true,
-          new TaskSchedulerDescriptor[]{
-              TaskSchedulerDescriptor.create(LLAP_SERVICE, LLAP_SCHEDULER)
-                  .setUserPayload(servicePluginPayload)},
-          new ContainerLauncherDescriptor[]{
-              ContainerLauncherDescriptor.create(LLAP_SERVICE, LLAP_LAUNCHER)},
-          new TaskCommunicatorDescriptor[]{
-              TaskCommunicatorDescriptor.create(LLAP_SERVICE, LLAP_TASK_COMMUNICATOR)
-                  .setUserPayload(servicePluginPayload)});
+          new TaskSchedulerDescriptor[] { TaskSchedulerDescriptor.create(
+              LLAP_SERVICE, LLAP_SCHEDULER).setUserPayload(servicePluginPayload) },
+          new ContainerLauncherDescriptor[] { ContainerLauncherDescriptor.create(
+              LLAP_SERVICE, LLAP_LAUNCHER) },
+          new TaskCommunicatorDescriptor[] { TaskCommunicatorDescriptor.create(
+              LLAP_SERVICE, LLAP_TASK_COMMUNICATOR).setUserPayload(servicePluginPayload) });
     } else {
       servicePluginsDescriptor = ServicePluginsDescriptor.create(true);
     }
@@ -286,7 +301,8 @@ public class TezSessionState {
 
     final TezClient session = TezClient.newBuilder("HIVE-" + sessionId, tezConfig)
         .setIsSession(true).setLocalResources(commonLocalResources)
-        .setServicePluginDescriptor(servicePluginsDescriptor).build();
+        .setCredentials(llapCredentials).setServicePluginDescriptor(servicePluginsDescriptor)
+        .build();
 
     LOG.info("Opening new Tez Session (id: " + sessionId
         + ", scratch dir: " + tezScratchDir + ")");

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index b19c70a..bdf5dc2 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
 import org.apache.hadoop.hive.llap.io.api.LlapIo;
-import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -214,7 +214,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
       LOG.debug("Wrapping " + inputFormat);
     }
     @SuppressWarnings("unchecked")
-    LlapIo<VectorizedRowBatch> llapIo = LlapIoProxy.getIo();
+    LlapIo<VectorizedRowBatch> llapIo = LlapProxy.getIo();
     if (llapIo == null) {
       LOG.info("Not using LLAP because IO is not initialized");
       return inputFormat;

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2562c4/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java
index 9269ff4..9434e91 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java
@@ -122,6 +122,6 @@ public abstract class AbstractSerDe implements SerDe {
    *        does, in fact, store it inside metastore, based on table parameters.
    */
   public boolean shouldStoreFieldsInMetastore(Map<String, String> tableParams) {
-    return false; // The default, unless SerDe overrides it. TODO#
+    return false; // The default, unless SerDe overrides it.
   }
 }


Mime
View raw message