hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [06/10] hive git commit: HIVE-12934. Refactor llap module structure to allow for a usable client. (Siddharth Seth, reviewed by Sergey Shelukhin)
Date Mon, 01 Feb 2016 17:46:00 GMT
http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java
deleted file mode 100644
index 515bf3c..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-
-public class LlapNodeId {
-
-  private static final LoadingCache<LlapNodeId, LlapNodeId> CACHE =
-      CacheBuilder.newBuilder().softValues().build(
-          new CacheLoader<LlapNodeId, LlapNodeId>() {
-            @Override
-            public LlapNodeId load(LlapNodeId key) throws Exception {
-              return key;
-            }
-          });
-
-  public static LlapNodeId getInstance(String hostname, int port) {
-    return CACHE.getUnchecked(new LlapNodeId(hostname, port));
-  }
-
-
-  private final String hostname;
-  private final int port;
-
-
-  private LlapNodeId(String hostname, int port) {
-    this.hostname = hostname;
-    this.port = port;
-  }
-
-  public String getHostname() {
-    return hostname;
-  }
-
-  public int getPort() {
-    return port;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    LlapNodeId that = (LlapNodeId) o;
-
-    if (port != that.port) {
-      return false;
-    }
-    if (!hostname.equals(that.hostname)) {
-      return false;
-    }
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    int result = hostname.hashCode();
-    result = 1009 * result + port;
-    return result;
-  }
-
-  @Override
-  public String toString() {
-    return hostname + ":" + port;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
index 544af09..0399798 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
@@ -25,6 +25,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Collection;
 
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -217,7 +219,13 @@ public class LlapServiceDriver {
     CompressionUtils.unTar(new Path(libDir, "tez.tar.gz").toString(), libDir.toString(), true);
     lfs.delete(new Path(libDir, "tez.tar.gz"), false);
 
+    // llap-common
+    lfs.copyFromLocalFile(new Path(Utilities.jarFinderGetJar(LlapDaemonProtocolProtos.class)), libDir);
+    // llap-tez
+    lfs.copyFromLocalFile(new Path(Utilities.jarFinderGetJar(LlapTezUtils.class)), libDir);
+    // llap-server
     lfs.copyFromLocalFile(new Path(Utilities.jarFinderGetJar(LlapInputFormat.class)), libDir);
+    // hive-exec
     lfs.copyFromLocalFile(new Path(Utilities.jarFinderGetJar(HiveInputFormat.class)), libDir);
 
     // copy default aux classes (json/hbase)

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java
deleted file mode 100644
index 4c09941..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.daemon;
-
-import org.apache.hadoop.ipc.ProtocolInfo;
-import org.apache.hadoop.security.KerberosInfo;
-import org.apache.hadoop.security.token.TokenInfo;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
-import org.apache.hadoop.hive.llap.security.LlapTokenSelector;
-
-@ProtocolInfo(protocolName = "org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB", protocolVersion = 1)
-@KerberosInfo(serverPrincipal = HiveConf.HIVE_LLAP_DAEMON_SERVICE_PRINCIPAL_NAME)
-@TokenInfo(LlapTokenSelector.class)
-public interface LlapDaemonProtocolBlockingPB extends LlapDaemonProtocolProtos.LlapDaemonProtocol.BlockingInterface {
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapManagementProtocolBlockingPB.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapManagementProtocolBlockingPB.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapManagementProtocolBlockingPB.java
deleted file mode 100644
index 4efadac..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapManagementProtocolBlockingPB.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap.daemon;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
-import org.apache.hadoop.ipc.ProtocolInfo;
-import org.apache.hadoop.security.KerberosInfo;
-
-@ProtocolInfo(protocolName = "org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB", protocolVersion = 1)
-@KerberosInfo(serverPrincipal = HiveConf.HIVE_LLAP_DAEMON_SERVICE_PRINCIPAL_NAME)
-public interface LlapManagementProtocolBlockingPB extends LlapDaemonProtocolProtos.LlapManagementProtocol.BlockingInterface {
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 7d7fa00..94b3b41 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -19,7 +19,6 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryPoolMXBean;
 import java.lang.management.MemoryType;
 import java.net.InetSocketAddress;
-import java.net.URLClassLoader;
 import java.util.Arrays;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
@@ -48,7 +47,6 @@ import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
 import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.ExitUtil;
@@ -67,7 +65,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
   private static final Logger LOG = LoggerFactory.getLogger(LlapDaemon.class);
 
   private final Configuration shuffleHandlerConf;
-  private final LlapDaemonProtocolServerImpl server;
+  private final LlapProtocolServerImpl server;
   private final ContainerRunnerImpl containerRunner;
   private final AMReporter amReporter;
   private final LlapRegistryService registry;
@@ -166,7 +164,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
 
     this.amReporter = new AMReporter(srvAddress, new QueryFailedHandlerProxy(), daemonConf);
 
-    this.server = new LlapDaemonProtocolServerImpl(
+    this.server = new LlapProtocolServerImpl(
         numHandlers, this, srvAddress, mngAddress, srvPort, mngPort);
 
     ClassLoader executorClassLoader = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
deleted file mode 100644
index 9c7d2e2..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.daemon.impl;
-
-import javax.annotation.Nullable;
-import javax.net.SocketFactory;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.ProtocolProxy;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
-import org.apache.hadoop.security.UserGroupInformation;
-
-// TODO Change all this to be based on a regular interface instead of relying on the Proto service - Exception signatures cannot be controlled without this for the moment.
-
-
-public class LlapDaemonProtocolClientImpl implements LlapDaemonProtocolBlockingPB {
-
-  private final Configuration conf;
-  private final InetSocketAddress serverAddr;
-  private final RetryPolicy retryPolicy;
-  private final SocketFactory socketFactory;
-  LlapDaemonProtocolBlockingPB proxy;
-
-
-  public LlapDaemonProtocolClientImpl(Configuration conf, String hostname, int port,
-                                      @Nullable RetryPolicy retryPolicy,
-                                      @Nullable SocketFactory socketFactory) {
-    this.conf = conf;
-    this.serverAddr = NetUtils.createSocketAddr(hostname, port);
-    this.retryPolicy = retryPolicy;
-    if (socketFactory == null) {
-      this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
-    } else {
-      this.socketFactory = socketFactory;
-    }
-  }
-
-  @Override
-  public SubmitWorkResponseProto submitWork(RpcController controller,
-                                                                     SubmitWorkRequestProto request) throws
-      ServiceException {
-    try {
-      return getProxy().submitWork(null, request);
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
-  }
-
-  @Override
-  public SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller,
-                                                            SourceStateUpdatedRequestProto request) throws
-      ServiceException {
-    try {
-      return getProxy().sourceStateUpdated(null, request);
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
-  }
-
-  @Override
-  public QueryCompleteResponseProto queryComplete(RpcController controller,
-                                                  QueryCompleteRequestProto request) throws
-      ServiceException {
-    try {
-      return getProxy().queryComplete(null, request);
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
-  }
-
-  @Override
-  public TerminateFragmentResponseProto terminateFragment(
-      RpcController controller,
-      TerminateFragmentRequestProto request) throws ServiceException {
-    try {
-      return getProxy().terminateFragment(null, request);
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
-  }
-
-  public LlapDaemonProtocolBlockingPB getProxy() throws IOException {
-    if (proxy == null) {
-      proxy = createProxy();
-    }
-    return proxy;
-  }
-
-  public LlapDaemonProtocolBlockingPB createProxy() throws IOException {
-    RPC.setProtocolEngine(conf, LlapDaemonProtocolBlockingPB.class, ProtobufRpcEngine.class);
-    ProtocolProxy<LlapDaemonProtocolBlockingPB> proxy =
-        RPC.getProtocolProxy(LlapDaemonProtocolBlockingPB.class, 0, serverAddr,
-            UserGroupInformation.getCurrentUser(), conf, NetUtils.getDefaultSocketFactory(conf), 0,
-            retryPolicy);
-    return proxy.getProxy();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
deleted file mode 100644
index 45ca906..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.daemon.impl;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedAction;
-import java.util.concurrent.atomic.AtomicReference;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
-import com.google.protobuf.BlockingService;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.hive.llap.security.LlapSecurityHelper;
-import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
-import org.apache.hadoop.hive.llap.security.SecretManager;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
-import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
-import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB;
-import org.apache.hadoop.hive.llap.security.LlapDaemonPolicyProvider;
-
-public class LlapDaemonProtocolServerImpl extends AbstractService
-    implements LlapDaemonProtocolBlockingPB, LlapManagementProtocolBlockingPB {
-
-  private static final Logger LOG = LoggerFactory.getLogger(LlapDaemonProtocolServerImpl.class);
-
-  private final int numHandlers;
-  private final ContainerRunner containerRunner;
-  private final int srvPort, mngPort;
-  private RPC.Server server, mngServer;
-  private final AtomicReference<InetSocketAddress> srvAddress, mngAddress;
-  private SecretManager zkSecretManager;
-
-  public LlapDaemonProtocolServerImpl(int numHandlers,
-                                      ContainerRunner containerRunner,
-                                      AtomicReference<InetSocketAddress> srvAddress,
-                                      AtomicReference<InetSocketAddress> mngAddress,
-                                      int srvPort,
-                                      int mngPort) {
-    super("LlapDaemonProtocolServerImpl");
-    this.numHandlers = numHandlers;
-    this.containerRunner = containerRunner;
-    this.srvAddress = srvAddress;
-    this.srvPort = srvPort;
-    this.mngAddress = mngAddress;
-    this.mngPort = mngPort;
-    LOG.info("Creating: " + LlapDaemonProtocolServerImpl.class.getSimpleName() +
-        " with port configured to: " + srvPort);
-  }
-
-  @Override
-  public SubmitWorkResponseProto submitWork(RpcController controller,
-                                            SubmitWorkRequestProto request) throws
-      ServiceException {
-    try {
-      return containerRunner.submitWork(request);
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
-  }
-
-  @Override
-  public SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller,
-      SourceStateUpdatedRequestProto request) throws ServiceException {
-    return containerRunner.sourceStateUpdated(request);
-  }
-
-  @Override
-  public QueryCompleteResponseProto queryComplete(RpcController controller,
-      QueryCompleteRequestProto request) throws ServiceException {
-    return containerRunner.queryComplete(request);
-  }
-
-  @Override
-  public TerminateFragmentResponseProto terminateFragment(
-      RpcController controller,
-      TerminateFragmentRequestProto request) throws ServiceException {
-    return containerRunner.terminateFragment(request);
-  }
-
-  @Override
-  public void serviceStart() {
-    final Configuration conf = getConfig();
-    final BlockingService daemonImpl =
-        LlapDaemonProtocolProtos.LlapDaemonProtocol.newReflectiveBlockingService(this);
-    final BlockingService managementImpl =
-        LlapDaemonProtocolProtos.LlapManagementProtocol.newReflectiveBlockingService(this);
-    if (!UserGroupInformation.isSecurityEnabled()) {
-      startProtocolServers(conf, daemonImpl, managementImpl);
-      return;
-    }
-    String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
-        llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
-    zkSecretManager = SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab);
-
-    // Start the protocol server after properly authenticating with daemon keytab.
-    UserGroupInformation daemonUgi = null;
-    try {
-      daemonUgi = LlapSecurityHelper.loginWithKerberos(llapPrincipal, llapKeytab);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    daemonUgi.doAs(new PrivilegedAction<Void>() {
-       @Override
-      public Void run() {
-         startProtocolServers(conf, daemonImpl, managementImpl);
-         return null;
-      }
-    });
-  }
-
-  private void startProtocolServers(
-      Configuration conf, BlockingService daemonImpl, BlockingService managementImpl) {
-    server = startProtocolServer(srvPort, numHandlers, srvAddress, conf, daemonImpl,
-        LlapDaemonProtocolBlockingPB.class);
-    mngServer = startProtocolServer(mngPort, 2, mngAddress, conf, managementImpl,
-        LlapManagementProtocolBlockingPB.class);
-  }
-
-  private RPC.Server startProtocolServer(int srvPort, int numHandlers,
-      AtomicReference<InetSocketAddress> bindAddress, Configuration conf,
-      BlockingService impl, Class<?> protocolClass) {
-    InetSocketAddress addr = new InetSocketAddress(srvPort);
-    RPC.Server server;
-    try {
-      server = createServer(protocolClass, addr, conf, numHandlers, impl);
-      server.start();
-    } catch (IOException e) {
-      LOG.error("Failed to run RPC Server on port: " + srvPort, e);
-      throw new RuntimeException(e);
-    }
-
-    InetSocketAddress serverBindAddress = NetUtils.getConnectAddress(server);
-    bindAddress.set(NetUtils.createSocketAddrForHost(
-        serverBindAddress.getAddress().getCanonicalHostName(),
-        serverBindAddress.getPort()));
-    LOG.info("Instantiated " + protocolClass.getSimpleName() + " at " + bindAddress);
-    return server;
-  }
-
-  @Override
-  public void serviceStop() {
-    if (server != null) {
-      server.stop();
-    }
-    if (mngServer != null) {
-      mngServer.stop();
-    }
-  }
-
-  @InterfaceAudience.Private
-  @VisibleForTesting
-  InetSocketAddress getBindAddress() {
-    return srvAddress.get();
-  }
-
-  private RPC.Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf,
-                                  int numHandlers, BlockingService blockingService) throws
-      IOException {
-    RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
-    RPC.Builder builder = new RPC.Builder(conf)
-        .setProtocol(pbProtocol)
-        .setInstance(blockingService)
-        .setBindAddress(addr.getHostName())
-        .setPort(addr.getPort())
-        .setNumHandlers(numHandlers);
-    if (zkSecretManager != null) {
-      builder = builder.setSecretManager(zkSecretManager);
-    }
-    RPC.Server server = builder.build();
-
-    if (conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
-      server.refreshServiceAcl(conf, new LlapDaemonPolicyProvider());
-    }
-    return server;
-  }
-
-
-  @Override
-  public GetTokenResponseProto getDelegationToken(RpcController controller,
-      GetTokenRequestProto request) throws ServiceException {
-    if (zkSecretManager == null) {
-      throw new ServiceException("Operation not supported on unsecure cluster");
-    }
-    UserGroupInformation ugi;
-    try {
-      ugi = UserGroupInformation.getCurrentUser();
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
-    String user = ugi.getUserName();
-    Text owner = new Text(user);
-    Text realUser = null;
-    if (ugi.getRealUser() != null) {
-      realUser = new Text(ugi.getRealUser().getUserName());
-    }
-    Text renewer = new Text(ugi.getShortUserName());
-    LlapTokenIdentifier llapId = new LlapTokenIdentifier(owner, renewer, realUser);
-    // TODO: note that the token is not renewable right now and will last for 2 weeks by default.
-    Token<LlapTokenIdentifier> token = new Token<LlapTokenIdentifier>(llapId, zkSecretManager);
-    ByteArrayDataOutput out = ByteStreams.newDataOutput();
-    try {
-      token.write(out);
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
-    ByteString bs = ByteString.copyFrom(out.toByteArray());
-    GetTokenResponseProto response = GetTokenResponseProto.newBuilder().setToken(bs).build();
-    return response;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java
deleted file mode 100644
index e293a95..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.daemon.impl;
-
-import javax.annotation.Nullable;
-import javax.net.SocketFactory;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.ProtocolProxy;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto;
-import org.apache.hadoop.security.UserGroupInformation;
-
-public class LlapManagementProtocolClientImpl implements LlapManagementProtocolBlockingPB {
-
-  private final Configuration conf;
-  private final InetSocketAddress serverAddr;
-  private final RetryPolicy retryPolicy;
-  private final SocketFactory socketFactory;
-  LlapManagementProtocolBlockingPB proxy;
-
-
-  public LlapManagementProtocolClientImpl(Configuration conf, String hostname, int port,
-                                      @Nullable RetryPolicy retryPolicy,
-                                      @Nullable SocketFactory socketFactory) {
-    this.conf = conf;
-    this.serverAddr = NetUtils.createSocketAddr(hostname, port);
-    this.retryPolicy = retryPolicy;
-    if (socketFactory == null) {
-      this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
-    } else {
-      this.socketFactory = socketFactory;
-    }
-  }
-
-  public LlapManagementProtocolBlockingPB getProxy() throws IOException {
-    if (proxy == null) {
-      proxy = createProxy();
-    }
-    return proxy;
-  }
-
-  public LlapManagementProtocolBlockingPB createProxy() throws IOException {
-    RPC.setProtocolEngine(conf, LlapManagementProtocolBlockingPB.class, ProtobufRpcEngine.class);
-    ProtocolProxy<LlapManagementProtocolBlockingPB> proxy =
-        RPC.getProtocolProxy(LlapManagementProtocolBlockingPB.class, 0, serverAddr,
-            UserGroupInformation.getCurrentUser(), conf, NetUtils.getDefaultSocketFactory(conf), 0,
-            retryPolicy);
-    return proxy.getProxy();
-  }
-
-  @Override
-  public GetTokenResponseProto getDelegationToken(RpcController controller,
-      GetTokenRequestProto request) throws ServiceException {
-    try {
-      return getProxy().getDelegationToken(null, request);
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
new file mode 100644
index 0000000..c386d77
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.hive.llap.security.LlapSecurityHelper;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.hive.llap.security.SecretManager;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
+import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.protocol.LlapManagementProtocolPB;
+import org.apache.hadoop.hive.llap.security.LlapDaemonPolicyProvider;
+
+public class LlapProtocolServerImpl extends AbstractService
+    implements LlapProtocolBlockingPB, LlapManagementProtocolPB {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LlapProtocolServerImpl.class);
+
+  private final int numHandlers;
+  private final ContainerRunner containerRunner;
+  private final int srvPort, mngPort;
+  private RPC.Server server, mngServer;
+  private final AtomicReference<InetSocketAddress> srvAddress, mngAddress;
+  private SecretManager zkSecretManager;
+
+  public LlapProtocolServerImpl(int numHandlers,
+                                ContainerRunner containerRunner,
+                                AtomicReference<InetSocketAddress> srvAddress,
+                                AtomicReference<InetSocketAddress> mngAddress,
+                                int srvPort,
+                                int mngPort) {
+    super("LlapDaemonProtocolServerImpl");
+    this.numHandlers = numHandlers;
+    this.containerRunner = containerRunner;
+    this.srvAddress = srvAddress;
+    this.srvPort = srvPort;
+    this.mngAddress = mngAddress;
+    this.mngPort = mngPort;
+    LOG.info("Creating: " + LlapProtocolServerImpl.class.getSimpleName() +
+        " with port configured to: " + srvPort);
+  }
+
+  @Override
+  public SubmitWorkResponseProto submitWork(RpcController controller,
+                                            SubmitWorkRequestProto request) throws
+      ServiceException {
+    try {
+      return containerRunner.submitWork(request);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller,
+      SourceStateUpdatedRequestProto request) throws ServiceException {
+    return containerRunner.sourceStateUpdated(request);
+  }
+
+  @Override
+  public QueryCompleteResponseProto queryComplete(RpcController controller,
+      QueryCompleteRequestProto request) throws ServiceException {
+    return containerRunner.queryComplete(request);
+  }
+
+  @Override
+  public TerminateFragmentResponseProto terminateFragment(
+      RpcController controller,
+      TerminateFragmentRequestProto request) throws ServiceException {
+    return containerRunner.terminateFragment(request);
+  }
+
+  @Override
+  public void serviceStart() {
+    final Configuration conf = getConfig();
+    final BlockingService daemonImpl =
+        LlapDaemonProtocolProtos.LlapDaemonProtocol.newReflectiveBlockingService(this);
+    final BlockingService managementImpl =
+        LlapDaemonProtocolProtos.LlapManagementProtocol.newReflectiveBlockingService(this);
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      startProtocolServers(conf, daemonImpl, managementImpl);
+      return;
+    }
+    String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
+        llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
+    zkSecretManager = SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab);
+
+    // Start the protocol server after properly authenticating with daemon keytab.
+    UserGroupInformation daemonUgi = null;
+    try {
+      daemonUgi = LlapSecurityHelper.loginWithKerberos(llapPrincipal, llapKeytab);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    daemonUgi.doAs(new PrivilegedAction<Void>() {
+       @Override
+      public Void run() {
+         startProtocolServers(conf, daemonImpl, managementImpl);
+         return null;
+      }
+    });
+  }
+
+  private void startProtocolServers(
+      Configuration conf, BlockingService daemonImpl, BlockingService managementImpl) {
+    server = startProtocolServer(srvPort, numHandlers, srvAddress, conf, daemonImpl,
+        LlapProtocolBlockingPB.class);
+    mngServer = startProtocolServer(mngPort, 2, mngAddress, conf, managementImpl,
+        LlapManagementProtocolPB.class);
+  }
+
+  private RPC.Server startProtocolServer(int srvPort, int numHandlers,
+      AtomicReference<InetSocketAddress> bindAddress, Configuration conf,
+      BlockingService impl, Class<?> protocolClass) {
+    InetSocketAddress addr = new InetSocketAddress(srvPort);
+    RPC.Server server;
+    try {
+      server = createServer(protocolClass, addr, conf, numHandlers, impl);
+      server.start();
+    } catch (IOException e) {
+      LOG.error("Failed to run RPC Server on port: " + srvPort, e);
+      throw new RuntimeException(e);
+    }
+
+    InetSocketAddress serverBindAddress = NetUtils.getConnectAddress(server);
+    bindAddress.set(NetUtils.createSocketAddrForHost(
+        serverBindAddress.getAddress().getCanonicalHostName(),
+        serverBindAddress.getPort()));
+    LOG.info("Instantiated " + protocolClass.getSimpleName() + " at " + bindAddress);
+    return server;
+  }
+
+  @Override
+  public void serviceStop() {
+    if (server != null) {
+      server.stop();
+    }
+    if (mngServer != null) {
+      mngServer.stop();
+    }
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  InetSocketAddress getBindAddress() {
+    return srvAddress.get();
+  }
+
+  private RPC.Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf,
+                                  int numHandlers, BlockingService blockingService) throws
+      IOException {
+    RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
+    RPC.Builder builder = new RPC.Builder(conf)
+        .setProtocol(pbProtocol)
+        .setInstance(blockingService)
+        .setBindAddress(addr.getHostName())
+        .setPort(addr.getPort())
+        .setNumHandlers(numHandlers);
+    if (zkSecretManager != null) {
+      builder = builder.setSecretManager(zkSecretManager);
+    }
+    RPC.Server server = builder.build();
+
+    if (conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
+      server.refreshServiceAcl(conf, new LlapDaemonPolicyProvider());
+    }
+    return server;
+  }
+
+
+  @Override
+  public GetTokenResponseProto getDelegationToken(RpcController controller,
+      GetTokenRequestProto request) throws ServiceException {
+    if (zkSecretManager == null) {
+      throw new ServiceException("Operation not supported on unsecure cluster");
+    }
+    UserGroupInformation ugi;
+    try {
+      ugi = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    String user = ugi.getUserName();
+    Text owner = new Text(user);
+    Text realUser = null;
+    if (ugi.getRealUser() != null) {
+      realUser = new Text(ugi.getRealUser().getUserName());
+    }
+    Text renewer = new Text(ugi.getShortUserName());
+    LlapTokenIdentifier llapId = new LlapTokenIdentifier(owner, renewer, realUser);
+    // TODO: note that the token is not renewable right now and will last for 2 weeks by default.
+    Token<LlapTokenIdentifier> token = new Token<LlapTokenIdentifier>(llapId, zkSecretManager);
+    ByteArrayDataOutput out = ByteStreams.newDataOutput();
+    try {
+      token.write(out);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    ByteString bs = ByteString.copyFrom(out.toByteArray());
+    GetTokenResponseProto response = GetTokenResponseProto.newBuilder().setToken(bs).build();
+    return response;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
index aa065a9..480a394 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
-import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -89,7 +89,7 @@ public class QueryFragmentInfo {
     boolean canFinish = true;
     if (inputSpecList != null && !inputSpecList.isEmpty()) {
       for (IOSpecProto inputSpec : inputSpecList) {
-        if (isSourceOfInterest(inputSpec)) {
+        if (LlapTezUtils.isSourceOfInterest(inputSpec.getIoDescriptor().getClassName())) {
           // Lookup the state in the map.
           LlapDaemonProtocolProtos.SourceStateProto state = queryInfo.getSourceStateMap()
               .get(inputSpec.getConnectedVertexName());
@@ -129,7 +129,7 @@ public class QueryFragmentInfo {
     List<IOSpecProto> inputSpecList = fragmentSpec.getInputSpecsList();
     if (inputSpecList != null && !inputSpecList.isEmpty()) {
       for (IOSpecProto inputSpec : inputSpecList) {
-        if (isSourceOfInterest(inputSpec)) {
+        if (LlapTezUtils.isSourceOfInterest(inputSpec.getIoDescriptor().getClassName())) {
           sourcesOfInterest.add(inputSpec.getConnectedVertexName());
         }
       }
@@ -143,13 +143,6 @@ public class QueryFragmentInfo {
     queryInfo.unregisterFinishableStateUpdate(handler);
   }
 
-  private boolean isSourceOfInterest(IOSpecProto inputSpec) {
-    String inputClassName = inputSpec.getIoDescriptor().getClassName();
-    // MRInput is not of interest since it'll always be ready.
-    return !inputClassName.equals(MRInputLegacy.class.getName());
-  }
-
-
   @Override
   public boolean equals(Object o) {
     if (this == o) {

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index ede2a03..d88d82a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecPro
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
-import org.apache.hadoop.hive.llap.tezplugins.Converters;
+import org.apache.hadoop.hive.llap.tez.Converters;
 import org.apache.hadoop.hive.ql.io.IOContextMap;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java b/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
deleted file mode 100644
index 9549567..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.protocol;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.hadoop.security.token.TokenInfo;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
-import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
-import org.apache.tez.runtime.common.security.JobTokenSelector;
-
-@TokenInfo(JobTokenSelector.class)
-public interface LlapTaskUmbilicalProtocol extends VersionedProtocol {
-
-  public static final long versionID = 1L;
-
-  // From Tez. Eventually changes over to the LLAP protocol and ProtocolBuffers
-  boolean canCommit(TezTaskAttemptID taskid) throws IOException;
-  public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
-      throws IOException, TezException;
-
-  public void nodeHeartbeat(Text hostname, int port) throws IOException;
-
-  public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java
index d67647b..bedd265 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java
@@ -18,17 +18,17 @@
 package org.apache.hadoop.hive.llap.security;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
-import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.protocol.LlapManagementProtocolPB;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.Service;
 
 public class LlapDaemonPolicyProvider extends PolicyProvider {
   private static final Service[] services = new Service[] {
     new Service(HiveConf.ConfVars.LLAP_SECURITY_ACL.varname,
-        LlapDaemonProtocolBlockingPB.class),
+        LlapProtocolBlockingPB.class),
     new Service(HiveConf.ConfVars.LLAP_MANAGEMENT_ACL.varname,
-        LlapManagementProtocolBlockingPB.class)
+        LlapManagementProtocolPB.class)
   };
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
index a00b631..aa8745d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
@@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit;
 import javax.net.SocketFactory;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.daemon.impl.LlapManagementProtocolClientImpl;
+import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.llap.registry.ServiceInstance;

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
index 4dca2ce..eb514f2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
@@ -20,8 +20,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
-import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.protocol.LlapManagementProtocolPB;
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -36,8 +36,8 @@ public class LlapServerSecurityInfo extends SecurityInfo {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Trying to get KerberosInfo for " + protocol);
     }
-    if (!LlapDaemonProtocolBlockingPB.class.isAssignableFrom(protocol)
-        && !LlapManagementProtocolBlockingPB.class.isAssignableFrom(protocol)) return null;
+    if (!LlapProtocolBlockingPB.class.isAssignableFrom(protocol)
+        && !LlapManagementProtocolPB.class.isAssignableFrom(protocol)) return null;
     return new KerberosInfo() {
       @Override
       public Class<? extends Annotation> annotationType() {
@@ -62,7 +62,7 @@ public class LlapServerSecurityInfo extends SecurityInfo {
       LOG.debug("Trying to get TokenInfo for " + protocol);
     }
     // Tokens cannot be used for the management protocol (for now).
-    if (!LlapDaemonProtocolBlockingPB.class.isAssignableFrom(protocol)) return null;
+    if (!LlapProtocolBlockingPB.class.isAssignableFrom(protocol)) return null;
     return new TokenInfo() {
       @Override
       public Class<? extends Annotation> annotationType() {

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java
deleted file mode 100644
index b6e7499..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.security;
-
-import java.util.Collection;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenSelector;
-
-public class LlapTokenSelector implements TokenSelector<LlapTokenIdentifier> {
-  private static final Log LOG = LogFactory.getLog(LlapTokenSelector.class);
-
-  @Override
-  public Token<LlapTokenIdentifier> selectToken(Text service,
-      Collection<Token<? extends TokenIdentifier>> tokens) {
-    if (service == null) return null;
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Looking for a token with service " + service);
-    }
-    for (Token<? extends TokenIdentifier> token : tokens) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Token = " + token.getKind() + "; service = " + token.getService());
-      }
-      if (LlapTokenIdentifier.KIND_NAME.equals(token.getKind())
-          && service.equals(token.getService())) {
-        @SuppressWarnings("unchecked")
-        Token<LlapTokenIdentifier> result = (Token<LlapTokenIdentifier>)token;
-        return result;
-      }
-    }
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
deleted file mode 100644
index f61d62f..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.tezplugins;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.protobuf.ByteString;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UserPayloadProto;
-import org.apache.tez.common.TezCommonUtils;
-import org.apache.tez.dag.api.EntityDescriptor;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.api.event.VertexState;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.api.impl.GroupInputSpec;
-import org.apache.tez.runtime.api.impl.InputSpec;
-import org.apache.tez.runtime.api.impl.OutputSpec;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-
-public class Converters {
-
-  public static TaskSpec getTaskSpecfromProto(FragmentSpecProto FragmentSpecProto) {
-    TezTaskAttemptID taskAttemptID =
-        TezTaskAttemptID.fromString(FragmentSpecProto.getFragmentIdentifierString());
-
-    ProcessorDescriptor processorDescriptor = null;
-    if (FragmentSpecProto.hasProcessorDescriptor()) {
-      processorDescriptor = convertProcessorDescriptorFromProto(
-          FragmentSpecProto.getProcessorDescriptor());
-    }
-
-    List<InputSpec> inputSpecList = new ArrayList<InputSpec>(FragmentSpecProto.getInputSpecsCount());
-    if (FragmentSpecProto.getInputSpecsCount() > 0) {
-      for (IOSpecProto inputSpecProto : FragmentSpecProto.getInputSpecsList()) {
-        inputSpecList.add(getInputSpecFromProto(inputSpecProto));
-      }
-    }
-
-    List<OutputSpec> outputSpecList =
-        new ArrayList<OutputSpec>(FragmentSpecProto.getOutputSpecsCount());
-    if (FragmentSpecProto.getOutputSpecsCount() > 0) {
-      for (IOSpecProto outputSpecProto : FragmentSpecProto.getOutputSpecsList()) {
-        outputSpecList.add(getOutputSpecFromProto(outputSpecProto));
-      }
-    }
-
-    List<GroupInputSpec> groupInputSpecs =
-        new ArrayList<GroupInputSpec>(FragmentSpecProto.getGroupedInputSpecsCount());
-    if (FragmentSpecProto.getGroupedInputSpecsCount() > 0) {
-      for (GroupInputSpecProto groupInputSpecProto : FragmentSpecProto.getGroupedInputSpecsList()) {
-        groupInputSpecs.add(getGroupInputSpecFromProto(groupInputSpecProto));
-      }
-    }
-
-    TaskSpec taskSpec =
-        new TaskSpec(taskAttemptID, FragmentSpecProto.getDagName(), FragmentSpecProto.getVertexName(),
-            FragmentSpecProto.getVertexParallelism(), processorDescriptor, inputSpecList,
-            outputSpecList, groupInputSpecs);
-    return taskSpec;
-  }
-
-  public static FragmentSpecProto convertTaskSpecToProto(TaskSpec taskSpec) {
-    FragmentSpecProto.Builder builder = FragmentSpecProto.newBuilder();
-    builder.setFragmentIdentifierString(taskSpec.getTaskAttemptID().toString());
-    builder.setDagName(taskSpec.getDAGName());
-    builder.setVertexName(taskSpec.getVertexName());
-    builder.setVertexParallelism(taskSpec.getVertexParallelism());
-    builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId());
-    builder.setAttemptNumber(taskSpec.getTaskAttemptID().getId());
-
-    if (taskSpec.getProcessorDescriptor() != null) {
-      builder.setProcessorDescriptor(
-          convertToProto(taskSpec.getProcessorDescriptor()));
-    }
-
-    if (taskSpec.getInputs() != null && !taskSpec.getInputs().isEmpty()) {
-      for (InputSpec inputSpec : taskSpec.getInputs()) {
-        builder.addInputSpecs(convertInputSpecToProto(inputSpec));
-      }
-    }
-
-    if (taskSpec.getOutputs() != null && !taskSpec.getOutputs().isEmpty()) {
-      for (OutputSpec outputSpec : taskSpec.getOutputs()) {
-        builder.addOutputSpecs(convertOutputSpecToProto(outputSpec));
-      }
-    }
-
-    if (taskSpec.getGroupInputs() != null && !taskSpec.getGroupInputs().isEmpty()) {
-      for (GroupInputSpec groupInputSpec : taskSpec.getGroupInputs()) {
-        builder.addGroupedInputSpecs(convertGroupInputSpecToProto(groupInputSpec));
-
-      }
-    }
-    return builder.build();
-  }
-
-  private static ProcessorDescriptor convertProcessorDescriptorFromProto(
-      EntityDescriptorProto proto) {
-    String className = proto.getClassName();
-    UserPayload payload = convertPayloadFromProto(proto);
-    ProcessorDescriptor pd = ProcessorDescriptor.create(className);
-    setUserPayload(pd, payload);
-    return pd;
-  }
-
-  private static EntityDescriptorProto convertToProto(
-      EntityDescriptor<?> descriptor) {
-    EntityDescriptorProto.Builder builder = EntityDescriptorProto
-        .newBuilder();
-    builder.setClassName(descriptor.getClassName());
-
-    UserPayload userPayload = descriptor.getUserPayload();
-    if (userPayload != null) {
-      UserPayloadProto.Builder payloadBuilder = UserPayloadProto.newBuilder();
-      if (userPayload.hasPayload()) {
-        payloadBuilder.setUserPayload(ByteString.copyFrom(userPayload.getPayload()));
-        payloadBuilder.setVersion(userPayload.getVersion());
-      }
-      builder.setUserPayload(payloadBuilder.build());
-    }
-    if (descriptor.getHistoryText() != null) {
-      try {
-        builder.setHistoryText(TezCommonUtils.compressByteArrayToByteString(
-            descriptor.getHistoryText().getBytes("UTF-8")));
-      } catch (IOException e) {
-        throw new TezUncheckedException(e);
-      }
-    }
-    return builder.build();
-  }
-
-  private static InputSpec getInputSpecFromProto(IOSpecProto inputSpecProto) {
-    InputDescriptor inputDescriptor = null;
-    if (inputSpecProto.hasIoDescriptor()) {
-      inputDescriptor =
-          convertInputDescriptorFromProto(inputSpecProto.getIoDescriptor());
-    }
-    InputSpec inputSpec = new InputSpec(inputSpecProto.getConnectedVertexName(), inputDescriptor,
-        inputSpecProto.getPhysicalEdgeCount());
-    return inputSpec;
-  }
-
-  private static InputDescriptor convertInputDescriptorFromProto(
-      EntityDescriptorProto proto) {
-    String className = proto.getClassName();
-    UserPayload payload = convertPayloadFromProto(proto);
-    InputDescriptor id = InputDescriptor.create(className);
-    setUserPayload(id, payload);
-    return id;
-  }
-
-  private static OutputDescriptor convertOutputDescriptorFromProto(
-      EntityDescriptorProto proto) {
-    String className = proto.getClassName();
-    UserPayload payload = convertPayloadFromProto(proto);
-    OutputDescriptor od = OutputDescriptor.create(className);
-    setUserPayload(od, payload);
-    return od;
-  }
-
-  private static IOSpecProto convertInputSpecToProto(InputSpec inputSpec) {
-    IOSpecProto.Builder builder = IOSpecProto.newBuilder();
-    if (inputSpec.getSourceVertexName() != null) {
-      builder.setConnectedVertexName(inputSpec.getSourceVertexName());
-    }
-    if (inputSpec.getInputDescriptor() != null) {
-      builder.setIoDescriptor(convertToProto(inputSpec.getInputDescriptor()));
-    }
-    builder.setPhysicalEdgeCount(inputSpec.getPhysicalEdgeCount());
-    return builder.build();
-  }
-
-  private static OutputSpec getOutputSpecFromProto(IOSpecProto outputSpecProto) {
-    OutputDescriptor outputDescriptor = null;
-    if (outputSpecProto.hasIoDescriptor()) {
-      outputDescriptor =
-          convertOutputDescriptorFromProto(outputSpecProto.getIoDescriptor());
-    }
-    OutputSpec outputSpec =
-        new OutputSpec(outputSpecProto.getConnectedVertexName(), outputDescriptor,
-            outputSpecProto.getPhysicalEdgeCount());
-    return outputSpec;
-  }
-
-  public static IOSpecProto convertOutputSpecToProto(OutputSpec outputSpec) {
-    IOSpecProto.Builder builder = IOSpecProto.newBuilder();
-    if (outputSpec.getDestinationVertexName() != null) {
-      builder.setConnectedVertexName(outputSpec.getDestinationVertexName());
-    }
-    if (outputSpec.getOutputDescriptor() != null) {
-      builder.setIoDescriptor(convertToProto(outputSpec.getOutputDescriptor()));
-    }
-    builder.setPhysicalEdgeCount(outputSpec.getPhysicalEdgeCount());
-    return builder.build();
-  }
-
-  private static GroupInputSpec getGroupInputSpecFromProto(GroupInputSpecProto groupInputSpecProto) {
-    GroupInputSpec groupSpec = new GroupInputSpec(groupInputSpecProto.getGroupName(),
-        groupInputSpecProto.getGroupVerticesList(),
-        convertInputDescriptorFromProto(groupInputSpecProto.getMergedInputDescriptor()));
-    return groupSpec;
-  }
-
-  private static GroupInputSpecProto convertGroupInputSpecToProto(GroupInputSpec groupInputSpec) {
-    GroupInputSpecProto.Builder builder = GroupInputSpecProto.newBuilder();
-    builder.setGroupName(groupInputSpec.getGroupName());
-    builder.addAllGroupVertices(groupInputSpec.getGroupVertices());
-    builder.setMergedInputDescriptor(convertToProto(groupInputSpec.getMergedInputDescriptor()));
-    return builder.build();
-  }
-
-
-  private static void setUserPayload(EntityDescriptor<?> entity, UserPayload payload) {
-    if (payload != null) {
-      entity.setUserPayload(payload);
-    }
-  }
-
-  private static UserPayload convertPayloadFromProto(
-      EntityDescriptorProto proto) {
-    UserPayload userPayload = null;
-    if (proto.hasUserPayload()) {
-      if (proto.getUserPayload().hasUserPayload()) {
-        userPayload =
-            UserPayload.create(proto.getUserPayload().getUserPayload().asReadOnlyByteBuffer(), proto.getUserPayload().getVersion());
-      } else {
-        userPayload = UserPayload.create(null);
-      }
-    }
-    return userPayload;
-  }
-
-  public static SourceStateProto fromVertexState(VertexState state) {
-    switch (state) {
-      case SUCCEEDED:
-        return SourceStateProto.S_SUCCEEDED;
-      case RUNNING:
-        return SourceStateProto.S_RUNNING;
-      default:
-        throw new RuntimeException("Unexpected state: " + state);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java
deleted file mode 100644
index 07703a2..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.tezplugins;
-
-import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
-import org.apache.tez.serviceplugins.api.ContainerLauncher;
-import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
-import org.apache.tez.serviceplugins.api.ContainerStopRequest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LlapContainerLauncher extends ContainerLauncher {
-  private static final Logger LOG = LoggerFactory.getLogger(LlapContainerLauncher.class);
-
-  public LlapContainerLauncher(ContainerLauncherContext containerLauncherContext) {
-    super(containerLauncherContext);
-  }
-
-  @Override
-  public void launchContainer(ContainerLaunchRequest containerLaunchRequest) {
-    LOG.info("No-op launch for container: " + containerLaunchRequest.getContainerId() +
-        " succeeded on host: " + containerLaunchRequest.getNodeId());
-    getContext().containerLaunched(containerLaunchRequest.getContainerId());
-  }
-
-  @Override
-  public void stopContainer(ContainerStopRequest containerStopRequest) {
-    LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + containerStopRequest);
-    getContext().containerStopRequested(containerStopRequest.getContainerId());
-  }
-}
\ No newline at end of file


Mime
View raw message