hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1672517 [2/2] - in /hive/branches/llap/llap-server: ./ src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/ src/java/org/apache/hadoop/hive/llap/daemon/ src/java/org/apache/hadoop/hive/llap/daemon/impl/ src/java/org/apache/had...
Date Thu, 09 Apr 2015 23:12:16 GMT
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java?rev=1672517&r1=1672516&r2=1672517&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
Thu Apr  9 23:12:15 2015
@@ -20,7 +20,10 @@ import java.net.InetSocketAddress;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
@@ -42,14 +45,25 @@ public class LlapDaemonProtocolClientImp
   }
 
   @Override
-  public LlapDaemonProtocolProtos.SubmitWorkResponseProto submitWork(RpcController controller,
-                                                                     LlapDaemonProtocolProtos.SubmitWorkRequestProto
request) throws
+  public SubmitWorkResponseProto submitWork(RpcController controller,
+                                                                     SubmitWorkRequestProto
request) throws
       ServiceException {
     try {
       return getProxy().submitWork(null, request);
     } catch (IOException e) {
       throw new ServiceException(e);
     }
+  }
+
+  @Override
+  public SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller,
+                                                            SourceStateUpdatedRequestProto
request) throws
+      ServiceException {
+    try {
+      return getProxy().sourceStateUpdated(null, request);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
   }
 
   public LlapDaemonProtocolBlockingPB getProxy() throws IOException {

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java?rev=1672517&r1=1672516&r2=1672517&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
Thu Apr  9 23:12:15 2015
@@ -27,6 +27,9 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
@@ -62,7 +65,7 @@ public class LlapDaemonProtocolServerImp
 
   @Override
   public SubmitWorkResponseProto submitWork(RpcController controller,
-                                            LlapDaemonProtocolProtos.SubmitWorkRequestProto
request) throws
+                                            SubmitWorkRequestProto request) throws
       ServiceException {
     try {
       containerRunner.submitWork(request);
@@ -73,6 +76,14 @@ public class LlapDaemonProtocolServerImp
   }
 
   @Override
+  public SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller,
+                                                            SourceStateUpdatedRequestProto
request) throws
+      ServiceException {
+    containerRunner.sourceStateUpdated(request);
+    return SourceStateUpdatedResponseProto.getDefaultInstance();
+  }
+
+  @Override
   public void serviceStart() {
     Configuration conf = getConfig();
 

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java?rev=1672517&r1=1672516&r2=1672517&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
Thu Apr  9 23:12:15 2015
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.llap.daemo
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UserPayloadProto;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.dag.api.EntityDescriptor;
@@ -31,6 +32,7 @@ import org.apache.tez.dag.api.OutputDesc
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.InputSpec;
@@ -250,4 +252,15 @@ public class Converters {
     return userPayload;
   }
 
+  public static SourceStateProto fromVertexState(VertexState state) {
+    switch (state) {
+      case SUCCEEDED:
+        return SourceStateProto.S_SUCCEEDED;
+      case RUNNING:
+        return SourceStateProto.S_RUNNING;
+      default:
+        throw new RuntimeException("Unexpected state: " + state);
+    }
+  }
+
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java?rev=1672517&r1=1672516&r2=1672517&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
Thu Apr  9 23:12:15 2015
@@ -31,9 +31,12 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
 import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
+import org.apache.hadoop.hive.llap.tezplugins.helpers.SourceStateTracker;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtocolSignature;
@@ -51,6 +54,7 @@ import org.apache.tez.dag.api.TaskCommun
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -64,11 +68,16 @@ public class LlapTaskCommunicator extend
   private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST;
   private final ConcurrentMap<String, ByteBuffer> credentialMap;
 
+  // Tracks containerIds and taskAttemptIds, so can be kept independent of the running DAG.
+  // When DAG specific cleanup happens, it'll be better to link this to a DAG though.
   private final EntityTracker entityTracker = new EntityTracker();
+  private final SourceStateTracker sourceStateTracker;
 
   private TaskCommunicator communicator;
   private final LlapTaskUmbilicalProtocol umbilical;
 
+  private volatile String currentDagName;
+
   public LlapTaskCommunicator(
       TaskCommunicatorContext taskCommunicatorContext) {
     super(taskCommunicatorContext);
@@ -87,6 +96,7 @@ public class LlapTaskCommunicator extend
     BASE_SUBMIT_WORK_REQUEST = baseBuilder.build();
 
     credentialMap = new ConcurrentHashMap<>();
+    sourceStateTracker = new SourceStateTracker(getTaskCommunicatorContext(), this);
   }
 
   @Override
@@ -153,6 +163,7 @@ public class LlapTaskCommunicator extend
     entityTracker.unregisterContainer(containerId);
   }
 
+
   @Override
   public void registerRunningTaskAttempt(final ContainerId containerId, final TaskSpec taskSpec,
                                          Map<String, LocalResource> additionalResources,
@@ -161,6 +172,9 @@ public class LlapTaskCommunicator extend
                                          int priority)  {
     super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials,
         credentialsChanged, priority);
+    if (taskSpec.getDAGName() != currentDagName) {
+      resetCurrentDag(taskSpec.getDAGName());
+    }
 
     SubmitWorkRequestProto requestProto;
     try {
@@ -184,6 +198,8 @@ public class LlapTaskCommunicator extend
 
     entityTracker.registerTaskAttempt(containerId, taskSpec.getTaskAttemptID(), host, port);
 
+    sourceStateTracker.addTask(host, port, taskSpec.getInputs());
+
     // Have to register this up front right now. Otherwise, it's possible for the task to
start
     // sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them.
     getTaskCommunicatorContext()
@@ -240,6 +256,45 @@ public class LlapTaskCommunicator extend
     // be told that it needs to die since it isn't recognized.
   }
 
+  @Override
+  public void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) {
+    // Delegate updates over to the source state tracker.
+    sourceStateTracker
+        .sourceStateUpdated(vertexStateUpdate.getVertexName(), vertexStateUpdate.getVertexState());
+  }
+
+  public void sendStateUpdate(final String host, final int port,
+                              final SourceStateUpdatedRequestProto request) {
+    communicator.sendSourceStateUpdate(request, host, port,
+        new TaskCommunicator.ExecuteRequestCallback<SourceStateUpdatedResponseProto>()
{
+          @Override
+          public void setResponse(SourceStateUpdatedResponseProto response) {
+          }
+
+          @Override
+          public void indicateError(Throwable t) {
+            // TODO HIVE-10280.
+            // Ideally, this should be retried for a while, after which the node should be
marked as failed.
+            // Considering tasks are supposed to run fast. Failing the task immediately may
be a good option.
+            LOG.error(
+                "Failed to send state update to node: " + host + ":" + port + ", StateUpdate="
+
+                    request, t);
+          }
+        });
+  }
+
+
+
+  private void resetCurrentDag(String newDagName) {
+    // Working on the assumption that a single DAG runs at a time per AM.
+    currentDagName = newDagName;
+    sourceStateTracker.resetState(newDagName);
+    LOG.info("CurrentDag set to: " + newDagName);
+    // TODO Additional state reset. Potentially sending messages to node to reset.
+    // Is it possible for heartbeats to come in from lost tasks - those should be told to
die, which
+    // is likely already happening.
+  }
+
   private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerId,
                                                             TaskSpec taskSpec) throws
       IOException {
@@ -411,4 +466,4 @@ public class LlapTaskCommunicator extend
       }
     }
   }
-}
+}
\ No newline at end of file

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java?rev=1672517&r1=1672516&r2=1672517&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
Thu Apr  9 23:12:15 2015
@@ -27,14 +27,20 @@ import com.google.common.util.concurrent
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.Message;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
 import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemonProtocolClientImpl;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
 import org.apache.hadoop.service.AbstractService;
 
 public class TaskCommunicator extends AbstractService {
 
+  private static final Log LOG = LogFactory.getLog(TaskCommunicator.class);
+
   private final ConcurrentMap<String, LlapDaemonProtocolBlockingPB> hostProxies;
   private ListeningExecutorService executor;
 
@@ -53,7 +59,7 @@ public class TaskCommunicator extends Ab
 
   public void submitWork(SubmitWorkRequestProto request, String host, int port,
                          final ExecuteRequestCallback<SubmitWorkResponseProto> callback)
{
-    ListenableFuture<SubmitWorkResponseProto> future = executor.submit(new SubmitWorkCallable(request,
host, port));
+    ListenableFuture<SubmitWorkResponseProto> future = executor.submit(new SubmitWorkCallable(host,
port, request));
     Futures.addCallback(future, new FutureCallback<SubmitWorkResponseProto>() {
       @Override
       public void onSuccess(SubmitWorkResponseProto result) {
@@ -68,23 +74,66 @@ public class TaskCommunicator extends Ab
 
   }
 
-  private class SubmitWorkCallable implements Callable<SubmitWorkResponseProto> {
+  public void sendSourceStateUpdate(final SourceStateUpdatedRequestProto request, final String
host, final int port,
+                                    final ExecuteRequestCallback<SourceStateUpdatedResponseProto>
callback) {
+    ListenableFuture<SourceStateUpdatedResponseProto> future =
+        executor.submit(new SendSourceStateUpdateCallable(host, port, request));
+    Futures.addCallback(future, new FutureCallback<SourceStateUpdatedResponseProto>()
{
+      @Override
+      public void onSuccess(SourceStateUpdatedResponseProto result) {
+        callback.setResponse(result);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        callback.indicateError(t);
+      }
+    });
+  }
+
+  private static abstract class CallableRequest<REQUEST extends  Message, RESPONSE extends
Message> implements Callable {
+
     final String hostname;
     final int port;
-    final SubmitWorkRequestProto request;
+    final REQUEST request;
 
-    private SubmitWorkCallable(SubmitWorkRequestProto request, String hostname, int port)
{
+
+    protected CallableRequest(String hostname, int port, REQUEST request) {
       this.hostname = hostname;
       this.port = port;
       this.request = request;
     }
 
+    public abstract RESPONSE call() throws Exception;
+  }
+
+  private class SubmitWorkCallable extends CallableRequest<SubmitWorkRequestProto, SubmitWorkResponseProto>
{
+
+    protected SubmitWorkCallable(String hostname, int port,
+                          SubmitWorkRequestProto submitWorkRequestProto) {
+      super(hostname, port, submitWorkRequestProto);
+    }
+
     @Override
     public SubmitWorkResponseProto call() throws Exception {
       return getProxy(hostname, port).submitWork(null, request);
     }
   }
 
+  private class SendSourceStateUpdateCallable
+      extends CallableRequest<SourceStateUpdatedRequestProto, SourceStateUpdatedResponseProto>
{
+
+    public SendSourceStateUpdateCallable(String hostname, int port,
+                                         SourceStateUpdatedRequestProto request) {
+      super(hostname, port, request);
+    }
+
+    @Override
+    public SourceStateUpdatedResponseProto call() throws Exception {
+      return getProxy(hostname, port).sourceStateUpdated(null, request);
+    }
+  }
+
   public interface ExecuteRequestCallback<T extends Message> {
     void setResponse(T response);
     void indicateError(Throwable t);

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java?rev=1672517&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
(added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
Thu Apr  9 23:12:15 2015
@@ -0,0 +1,212 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins.helpers;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.llap.LlapNodeId;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import org.apache.hadoop.hive.llap.tezplugins.Converters;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator;
+import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.runtime.api.impl.InputSpec;
+
+public class SourceStateTracker {
+
+  private static final Log LOG = LogFactory.getLog(SourceStateTracker.class);
+
+  private final TaskCommunicatorContext taskCommunicatorContext;
+  private final LlapTaskCommunicator taskCommunicator;
+
+  // Tracks vertices for which notifications have been registered
+  private final Set<String> notificationRegisteredVertices = new HashSet<>();
+
+  private final Map<String, SourceInfo> sourceInfoMap = new HashMap<>();
+  private final Map<LlapNodeId, NodeInfo> nodeInfoMap = new HashMap<>();
+
+  private volatile String currentDagName;
+
+
+  public SourceStateTracker(TaskCommunicatorContext taskCommunicatorContext,
+                            LlapTaskCommunicator taskCommunicator) {
+    this.taskCommunicatorContext = taskCommunicatorContext;
+    this.taskCommunicator = taskCommunicator;
+  }
+
+  /**
+   * To be invoked after each DAG completes.
+   */
+  public synchronized void resetState(String newDagName) {
+    sourceInfoMap.clear();
+    nodeInfoMap.clear();
+    notificationRegisteredVertices.clear();
+    this.currentDagName = newDagName;
+  }
+
+  public synchronized void addTask(String host, int port, List<InputSpec> inputSpecList)
{
+
+    // Add tracking information. Check if source state already known and send out an update
if it is.
+
+    List<String> sourcesOfInterest = getSourceInterestList(inputSpecList);
+    if (sourcesOfInterest != null && !sourcesOfInterest.isEmpty()) {
+      LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
+      NodeInfo nodeInfo = getNodeInfo(nodeId);
+
+      // Set up the data structures, before any notifications come in.
+      for (String src : sourcesOfInterest) {
+        VertexState oldStateForNode = nodeInfo.getLastKnownStateForSource(src);
+        if (oldStateForNode == null) {
+          // Not registered for this node.
+          // Register and send state if it is successful.
+          SourceInfo srcInfo = getSourceInfo(src);
+          srcInfo.addNode(nodeId);
+
+          nodeInfo.addSource(src, srcInfo.lastKnownState);
+          if (srcInfo.lastKnownState == VertexState.SUCCEEDED) {
+            sendStateUpdateToNode(nodeId, src, srcInfo.lastKnownState);
+          }
+
+        } else {
+          // Already registered to send updates to this node for the specific source.
+          // Nothing to do for now, unless tracking tasks at a later point.
+        }
+
+        // Setup for actual notifications, if not already done for a previous task.
+        maybeRegisterForVertexUpdates(src);
+      }
+    } else {
+      // Don't need to track anything for this task. No new notifications, etc.
+    }
+  }
+
+  public synchronized void sourceStateUpdated(String sourceName, VertexState sourceState)
{
+    SourceInfo sourceInfo = getSourceInfo(sourceName);
+    sourceInfo.lastKnownState = sourceState;
+    // Checking state per node for future failure handling scenarios, where an update
+    // to a single node may fail.
+    for (LlapNodeId nodeId : sourceInfo.getInterestedNodes()) {
+      NodeInfo nodeInfo = nodeInfoMap.get(nodeId);
+      VertexState lastStateForNode = nodeInfo.getLastKnownStateForSource(sourceName);
+      // Send only if the state has changed.
+      if (lastStateForNode != sourceState) {
+        nodeInfo.setLastKnownStateForSource(sourceName, sourceState);
+        sendStateUpdateToNode(nodeId, sourceName, sourceState);
+      }
+    }
+  }
+
+
+  private static class SourceInfo {
+    private final List<LlapNodeId> interestedNodes = new LinkedList<>();
+    // Always start in the running state. Requests for state updates will be sent out after
registration.
+    private VertexState lastKnownState = VertexState.RUNNING;
+
+    void addNode(LlapNodeId nodeId) {
+      interestedNodes.add(nodeId);
+    }
+
+    List<LlapNodeId> getInterestedNodes() {
+      return this.interestedNodes;
+    }
+  }
+
+  private synchronized SourceInfo getSourceInfo(String srcName) {
+    SourceInfo sourceInfo = sourceInfoMap.get(srcName);
+    if (sourceInfo == null) {
+      sourceInfo = new SourceInfo();
+      sourceInfoMap.put(srcName, sourceInfo);
+    }
+    return sourceInfo;
+  }
+
+
+  private static class NodeInfo {
+    private final Map<String, VertexState> sourcesOfInterest = new HashMap<>();
+
+    void addSource(String srcName, VertexState sourceState) {
+      sourcesOfInterest.put(srcName, sourceState);
+    }
+
+    VertexState getLastKnownStateForSource(String src) {
+      return sourcesOfInterest.get(src);
+    }
+
+    void setLastKnownStateForSource(String src, VertexState state) {
+      sourcesOfInterest.put(src, state);
+    }
+  }
+
+  private synchronized NodeInfo getNodeInfo(LlapNodeId llapNodeId) {
+    NodeInfo nodeInfo = nodeInfoMap.get(llapNodeId);
+    if (nodeInfo == null) {
+      nodeInfo = new NodeInfo();
+      nodeInfoMap.put(llapNodeId, nodeInfo);
+    }
+    return nodeInfo;
+  }
+
+
+  private List<String> getSourceInterestList(List<InputSpec> inputSpecList) {
+    List<String> sourcesOfInterest = Collections.emptyList();
+    if (inputSpecList != null) {
+      boolean alreadyFound = false;
+      for (InputSpec inputSpec : inputSpecList) {
+        if (isSourceOfInterest(inputSpec)) {
+          if (!alreadyFound) {
+            alreadyFound = true;
+            sourcesOfInterest = new LinkedList<>();
+          }
+          sourcesOfInterest.add(inputSpec.getSourceVertexName());
+        }
+      }
+    }
+    return sourcesOfInterest;
+  }
+
+
+  private void maybeRegisterForVertexUpdates(String sourceName) {
+    if (!notificationRegisteredVertices.contains(sourceName)) {
+      notificationRegisteredVertices.add(sourceName);
+      taskCommunicatorContext.registerForVertexStateUpdates(sourceName, EnumSet.of(
+          VertexState.RUNNING, VertexState.SUCCEEDED));
+    }
+  }
+
+  private boolean isSourceOfInterest(InputSpec inputSpec) {
+    String inputClassName = inputSpec.getInputDescriptor().getClassName();
+    // MRInput is not of interest since it'll always be ready.
+    return !inputClassName.equals(MRInputLegacy.class.getName());
+  }
+
+  void sendStateUpdateToNode(LlapNodeId nodeId, String sourceName, VertexState state) {
+    taskCommunicator.sendStateUpdate(nodeId.getHostname(), nodeId.getPort(),
+        SourceStateUpdatedRequestProto.newBuilder().setDagName(currentDagName).setSrcName(
+            sourceName)
+            .setState(Converters.fromVertexState(state)).build());
+  }
+
+
+}

Modified: hive/branches/llap/llap-server/src/protobuf/LlapDaemonProtocol.proto
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/protobuf/LlapDaemonProtocol.proto?rev=1672517&r1=1672516&r2=1672517&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/protobuf/LlapDaemonProtocol.proto (original)
+++ hive/branches/llap/llap-server/src/protobuf/LlapDaemonProtocol.proto Thu Apr  9 23:12:15
2015
@@ -60,6 +60,11 @@ message FragmentSpecProto {
   optional int32 attempt_number = 10;
 }
 
+enum SourceStateProto {
+  S_SUCCEEDED = 1;
+  S_RUNNING = 2;
+}
+
 message SubmitWorkRequestProto {
   optional string container_id_string = 1;
   optional string am_host = 2;
@@ -75,6 +80,16 @@ message SubmitWorkRequestProto {
 message SubmitWorkResponseProto {
 }
 
+message SourceStateUpdatedRequestProto {
+  optional string dag_name = 1;
+  optional string src_name = 2;
+  optional SourceStateProto state = 3;
+}
+
+message SourceStateUpdatedResponseProto {
+}
+
 service LlapDaemonProtocol {
   rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto);
+  rpc sourceStateUpdated(SourceStateUpdatedRequestProto) returns (SourceStateUpdatedResponseProto);
 }



Mime
View raw message