apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [1/2] apex-core git commit: APEXCORE-294 controlled shutdown of an application.
Date Fri, 03 Feb 2017 15:37:23 GMT
Repository: apex-core
Updated Branches:
  refs/heads/master 527c70bf8 -> f04e07c03


APEXCORE-294 controlled shutdown of an application.


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/51076c6c
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/51076c6c
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/51076c6c

Branch: refs/heads/master
Commit: 51076c6cb4e5abe333b207b5c5a821e683a8bfb0
Parents: 1e9896b
Author: Tushar R. Gosavi <tushar@apache.org>
Authored: Mon Jan 23 16:59:54 2017 +0530
Committer: Tushar R. Gosavi <tushar@apache.org>
Committed: Thu Feb 2 11:47:20 2017 +0530

----------------------------------------------------------------------
 docs/apex_cli.md                                | 10 +++++++
 .../datatorrent/stram/StramLocalCluster.java    |  4 +--
 .../stram/StreamingContainerAgent.java          | 13 ++++++++-
 .../stram/StreamingContainerManager.java        | 25 ++++++++---------
 .../StreamingContainerUmbilicalProtocol.java    | 20 +++++++++++++-
 .../java/com/datatorrent/stram/engine/Node.java | 20 +++++++++++---
 .../stram/engine/StreamingContainer.java        | 28 ++++++++++++++++----
 .../stram/webapp/StramWebServices.java          |  3 ++-
 8 files changed, 98 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/51076c6c/docs/apex_cli.md
----------------------------------------------------------------------
diff --git a/docs/apex_cli.md b/docs/apex_cli.md
index da62a82..97d082f 100644
--- a/docs/apex_cli.md
+++ b/docs/apex_cli.md
@@ -266,3 +266,13 @@ submit
 
 *Note*:  To perform runtime logical plan changes, like ability to add new operators,
 they must be part of the jar files that were deployed at application launch time.
+
+*kill-app* terminates operators in undefined order, causing some operators to not process
+data emitted by input operators. Such application can be restarted from last checkpointed
+state using `launch -originalAppId` command. As application is relaunched from checkpointed
+state there will be no data loss.
+
+*shutdown-app* This command is to terminate the application while making sure that all data
+emitted by input operators are processed throughout the DAG. Application terminated with
+`shutdown-app` command can not be restarted, as application is considered to be completed
+successfully.

http://git-wip-us.apache.org/repos/asf/apex-core/blob/51076c6c/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index e188b60..2ffbabd 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -135,7 +135,7 @@ public class StramLocalCluster implements Runnable, Controller
     {
       if (injectShutdown.containsKey(msg.getContainerId())) {
         ContainerHeartbeatResponse r = new ContainerHeartbeatResponse();
-        r.shutdown = true;
+        r.shutdown = ShutdownType.ABORT;
         return r;
       }
       try {
@@ -466,7 +466,7 @@ public class StramLocalCluster implements Runnable, Controller
         StreamingContainer c = childContainers.get(containerIdStr);
         if (c != null) {
           ContainerHeartbeatResponse r = new ContainerHeartbeatResponse();
-          r.shutdown = true;
+          r.shutdown = StreamingContainerUmbilicalProtocol.ShutdownType.ABORT;
           c.processHeartbeatResponse(r);
         }
         dnmgr.containerStopRequests.remove(containerIdStr);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/51076c6c/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
index 2ea37f4..1d0897d 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
@@ -51,6 +51,7 @@ import com.datatorrent.stram.api.OperatorDeployInfo.InputDeployInfo;
 import com.datatorrent.stram.api.OperatorDeployInfo.OperatorType;
 import com.datatorrent.stram.api.OperatorDeployInfo.OutputDeployInfo;
 import com.datatorrent.stram.api.OperatorDeployInfo.UnifierDeployInfo;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ShutdownType;
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StramToNodeRequest;
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StreamingContainerContext;
 import com.datatorrent.stram.engine.OperatorContext;
@@ -96,7 +97,7 @@ public class StreamingContainerAgent
     this.dnmgr = dnmgr;
   }
 
-  boolean shutdownRequested = false;
+  ShutdownType shutdownRequest = null;
   boolean stackTraceRequested = false;
 
   Set<PTOperator> deployOpers = Sets.newHashSet();
@@ -484,4 +485,14 @@ public class StreamingContainerAgent
   }
 
   public volatile String containerStackTrace = null;
+
+  public void requestShutDown(ShutdownType type)
+  {
+    shutdownRequest = type;
+  }
+
+  public boolean isShutdownRequested()
+  {
+    return (shutdownRequest != null);
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/51076c6c/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 45bfcdb..00a406c 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -134,6 +134,7 @@ import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHe
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse;
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats;
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ShutdownType;
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StramToNodeRequest;
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StreamingContainerContext;
 import com.datatorrent.stram.engine.OperatorResponse;
@@ -761,7 +762,7 @@ public class StreamingContainerManager implements PlanContext
         for (PTContainer c : pendingAllocation) {
           LOG.warn("Waiting for resource: {}m priority: {} {}", c.getRequiredMemoryMB(),
c.getResourceRequestPriority(), c);
         }
-        shutdownAllContainers(msg);
+        shutdownAllContainers(ShutdownType.ABORT, msg);
         this.forcedShutdown = true;
       } else {
         for (PTContainer c : pendingAllocation) {
@@ -1121,7 +1122,7 @@ public class StreamingContainerManager implements PlanContext
   public void scheduleContainerRestart(String containerId)
   {
     StreamingContainerAgent cs = this.getContainerAgent(containerId);
-    if (cs == null || cs.shutdownRequested) {
+    if (cs == null || cs.isShutdownRequested()) {
       // the container is no longer used / was released by us
       return;
     }
@@ -1428,7 +1429,7 @@ public class StreamingContainerManager implements PlanContext
       } else {
         String msg = String.format("Shutdown after reaching failure threshold for %s", oper);
         LOG.warn(msg);
-        shutdownAllContainers(msg);
+        shutdownAllContainers(ShutdownType.ABORT, msg);
         forcedShutdown = true;
       }
     } else {
@@ -1454,7 +1455,7 @@ public class StreamingContainerManager implements PlanContext
       // could be orphaned container that was replaced and needs to terminate
       LOG.error("Unknown container {}", heartbeat.getContainerId());
       ContainerHeartbeatResponse response = new ContainerHeartbeatResponse();
-      response.shutdown = true;
+      response.shutdown = ShutdownType.ABORT;
       return response;
     }
 
@@ -1771,11 +1772,11 @@ public class StreamingContainerManager implements PlanContext
 
     if (heartbeat.getContainerStats().operators.isEmpty() && isApplicationIdle())
{
       LOG.info("requesting idle shutdown for container {}", heartbeat.getContainerId());
-      rsp.shutdown = true;
+      rsp.shutdown = ShutdownType.ABORT;
     } else {
-      if (sca.shutdownRequested) {
+      if (sca.isShutdownRequested()) {
         LOG.info("requesting shutdown for container {}", heartbeat.getContainerId());
-        rsp.shutdown = true;
+        rsp.shutdown = sca.shutdownRequest;
       }
     }
 
@@ -2191,7 +2192,6 @@ public class StreamingContainerManager implements PlanContext
     int operatorCount = 0;
     UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, recovery, getCheckpointGroups());
     for (OperatorMeta logicalOperator : plan.getLogicalPlan().getRootOperators()) {
-      //LOG.debug("Updating checkpoints for operator {}", logicalOperator.getName());
       List<PTOperator> operators = plan.getOperators(logicalOperator);
       if (operators != null) {
         for (PTOperator operator : operators) {
@@ -2257,14 +2257,15 @@ public class StreamingContainerManager implements PlanContext
    * If containers don't respond, the application can be forcefully terminated
    * via yarn using forceKillApplication.
    *
+   * @param type
    * @param message
    */
-  public void shutdownAllContainers(String message)
+  public void shutdownAllContainers(ShutdownType type, String message)
   {
     this.shutdownDiagnosticsMessage = message;
-    LOG.info("Initiating application shutdown: {}", message);
+    LOG.info("Initiating application shutdown: type {} {}", type, message);
     for (StreamingContainerAgent cs : this.containers.values()) {
-      cs.shutdownRequested = true;
+      cs.requestShutDown(type);
     }
   }
 
@@ -2372,7 +2373,7 @@ public class StreamingContainerManager implements PlanContext
           LOG.debug("Container marked for shutdown: {}", c);
           // container already removed from plan
           // TODO: monitor soft shutdown
-          sca.shutdownRequested = true;
+          sca.requestShutDown(ShutdownType.ABORT);
         }
       }
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/51076c6c/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
b/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
index 150e3b3..77a33e6 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
@@ -341,6 +341,24 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
     }
   }
 
+  enum ShutdownType
+  {
+    /**
+     * WAIT_TERMINATE, when this value is set all input operators in the
+     * container terminates and send END_STREAM tuple to downstream
+     * operators. This type of shutdown make sure that data emitted by upstream
+     * operator before shutdown are processed by downstreams operator before
+     * they terminate.
+     */
+    WAIT_TERMINATE,
+    /**
+     * ABORT, In few cases we need to shutdown container forcefully such
+     * as resource allocation timeout, stale container or container without any operator.
+     * In such cases this flag will be used to send shutdown request to the container.
+     */
+    ABORT,
+  }
+
   /**
    *
    * Response from the stram to the container heartbeat
@@ -354,7 +372,7 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
     /**
      * Indicate container to exit heartbeat loop and shutdown.
      */
-    public boolean shutdown;
+    public ShutdownType shutdown;
 
     /**
      * Optional list of responses for operators in the container.

http://git-wip-us.apache.org/repos/asf/apex-core/blob/51076c6c/engine/src/main/java/com/datatorrent/stram/engine/Node.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index 4a5cbde..d779afe 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -317,9 +317,18 @@ public abstract class Node<OPERATOR extends Operator> implements
Component<Opera
   protected ProcessingMode PROCESSING_MODE;
   protected volatile boolean shutdown;
 
-  public void shutdown()
+  /**
+   * Shutdown the current node.
+   * If processEndWindow is set to true, then after shutting down, endWindow is called and
+   * END_STREAM tuple is sent to downstream operators.
+   *
+   * @param processEndWindow
+   */
+  public void shutdown(boolean processEndWindow)
   {
-    shutdown = true;
+    if (!processEndWindow) {
+      shutdown = true;
+    }
 
     synchronized (this) {
       alive = false;
@@ -345,6 +354,11 @@ public abstract class Node<OPERATOR extends Operator> implements
Component<Opera
     }
   }
 
+  public void shutdown()
+  {
+    shutdown(false);
+  }
+
   @Override
   public String toString()
   {
@@ -353,7 +367,7 @@ public abstract class Node<OPERATOR extends Operator> implements
Component<Opera
 
   protected void emitEndStream()
   {
-    // logger.debug("{} sending EndOfStream", this);
+    logger.info("{} sending EndOfStream", this);
     /*
      * since we are going away, we should let all the downstream operators know that.
      */

http://git-wip-us.apache.org/repos/asf/apex-core/blob/51076c6c/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 86c0402..c3886b4 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -807,11 +807,15 @@ public class StreamingContainer extends YarnContainerMain
       undeploy(rsp.undeployRequest);
     }
 
-    if (rsp.shutdown) {
-      logger.info("Received shutdown request");
-      processNodeRequests(false);
-      this.exitHeartbeatLoop = true;
-      return;
+    if (rsp.shutdown != null) {
+      logger.info("Received shutdown request type {}", rsp.shutdown);
+      if (rsp.shutdown == StreamingContainerUmbilicalProtocol.ShutdownType.ABORT) {
+        processNodeRequests(false);
+        this.exitHeartbeatLoop = true;
+        return;
+      } else if (rsp.shutdown == StreamingContainerUmbilicalProtocol.ShutdownType.WAIT_TERMINATE)
{
+        stopInputNodes();
+      }
     }
 
     if (rsp.deployRequest != null) {
@@ -833,6 +837,20 @@ public class StreamingContainer extends YarnContainerMain
     processNodeRequests(true);
   }
 
+  private void stopInputNodes()
+  {
+    for (Entry<Integer, Node<?>> e : nodes.entrySet()) {
+      Node<?> node = e.getValue();
+      if (node instanceof InputNode) {
+        final Thread thread = e.getValue().context.getThread();
+        if (thread == null || !thread.isAlive()) {
+          continue;
+        }
+      }
+      node.shutdown(true);
+    }
+  }
+
   private int getOutputQueueCapacity(List<OperatorDeployInfo> operatorList, int sourceOperatorId,
String sourcePortName)
   {
     for (OperatorDeployInfo odi : operatorList) {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/51076c6c/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
index 16b7ed7..995127c 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
@@ -84,6 +84,7 @@ import com.datatorrent.stram.StramUtils;
 import com.datatorrent.stram.StreamingContainerAgent;
 import com.datatorrent.stram.StreamingContainerManager;
 import com.datatorrent.stram.StringCodecs;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
 import com.datatorrent.stram.codec.LogicalPlanSerializer;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.logical.LogicalPlan.ModuleMeta;
@@ -361,7 +362,7 @@ public class StramWebServices
   {
     init();
     LOG.debug("Shutdown requested");
-    dagManager.shutdownAllContainers("Shutdown requested externally.");
+    dagManager.shutdownAllContainers(StreamingContainerUmbilicalProtocol.ShutdownType.WAIT_TERMINATE,
"Shutdown requested externally.");
     return new JSONObject();
   }
 


Mime
View raw message