apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bhup...@apache.org
Subject apex-core git commit: APEXCORE-602: group events by cause
Date Thu, 29 Jun 2017 05:54:45 GMT
Repository: apex-core
Updated Branches:
  refs/heads/master 1f78515fc -> 2ba608444


APEXCORE-602: group events by cause


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/2ba60844
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/2ba60844
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/2ba60844

Branch: refs/heads/master
Commit: 2ba6084440bdfc23d246cbb397ef46662d3a1688
Parents: 1f78515
Author: priya <priyag@apache.org>
Authored: Thu Feb 23 17:25:56 2017 +0530
Committer: priya <priyag@apache.org>
Committed: Wed Jun 28 22:17:43 2017 +0530

----------------------------------------------------------------------
 .../stram/StreamingAppMasterService.java        |  13 +-
 .../stram/StreamingContainerManager.java        |  29 ++-
 .../stram/StreamingContainerParent.java         |  20 +-
 .../com/datatorrent/stram/api/StramEvent.java   |  83 ++++---
 .../org/apache/apex/stram/GroupingManager.java  | 232 +++++++++++++++++++
 .../org/apache/apex/stram/GroupingRequest.java  | 184 +++++++++++++++
 .../apach/apex/stram/GroupingManagerTest.java   | 148 ++++++++++++
 .../apach/apex/stram/GroupingRequestTest.java   |  81 +++++++
 8 files changed, 746 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ba60844/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index ed9248a..09478eb 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -49,6 +49,8 @@ import org.apache.apex.engine.plugin.DefaultApexPluginDispatcher;
 import org.apache.apex.engine.plugin.loaders.ChainedPluginLocator;
 import org.apache.apex.engine.plugin.loaders.PropertyBasedPluginLocator;
 import org.apache.apex.engine.plugin.loaders.ServiceLoaderBasedPluginLocator;
+import org.apache.apex.stram.GroupingManager;
+import org.apache.apex.stram.GroupingRequest.EventGroupId;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.tuple.MutablePair;
@@ -170,6 +172,7 @@ public class StreamingAppMasterService extends CompositeService
   private StramDelegationTokenManager delegationTokenManager = null;
   private AppDataPushAgent appDataPushAgent;
   private ApexPluginDispatcher apexPluginDispatcher;
+  private final GroupingManager groupingManager = GroupingManager.getGroupingManagerInstance();
 
   public StreamingAppMasterService(ApplicationAttemptId appAttemptID)
   {
@@ -972,7 +975,8 @@ public class StreamingAppMasterService extends CompositeService
           launchContainer.run(); // communication with NMs is now async
 
           // record container start event
-          StramEvent ev = new StramEvent.StartContainerEvent(allocatedContainer.getId().toString(),
allocatedContainer.getNodeId().toString());
+          StramEvent ev = new StramEvent.StartContainerEvent(allocatedContainer.getId().toString(),
+              allocatedContainer.getNodeId().toString(), groupingManager.getEventGroupIdForAffectedContainer(allocatedContainer.getId().toString()));
           ev.setTimestamp(timestamp);
           dnmgr.recordEventAsync(ev);
         }
@@ -997,6 +1001,7 @@ public class StreamingAppMasterService extends CompositeService
           UserGroupInformation ugi = UserGroupInformation.getLoginUser();
           delegationTokenManager.cancelToken(allocatedContainer.delegationToken, ugi.getUserName());
         }
+        EventGroupId groupId = null;
         int exitStatus = containerStatus.getExitStatus();
         if (0 != exitStatus) {
           if (allocatedContainer != null) {
@@ -1039,7 +1044,9 @@ public class StreamingAppMasterService extends CompositeService
           // Recoverable failure or process killed (externally or via stop request by AM)
           // also occurs when a container was released by the application but never assigned/launched
           LOG.debug("Container {} failed or killed.", containerStatus.getContainerId());
-          dnmgr.scheduleContainerRestart(containerStatus.getContainerId().toString());
+          String containerIdStr = containerStatus.getContainerId().toString();
+          dnmgr.scheduleContainerRestart(containerIdStr);
+          groupId = groupingManager.getEventGroupIdForAffectedContainer(containerIdStr);
 //          }
         } else {
           // container completed successfully
@@ -1057,7 +1064,7 @@ public class StreamingAppMasterService extends CompositeService
         dnmgr.removeContainerAgent(containerIdStr);
 
         // record container stop event
-        StramEvent ev = new StramEvent.StopContainerEvent(containerIdStr, containerStatus.getExitStatus());
+        StramEvent ev = new StramEvent.StopContainerEvent(containerIdStr, containerStatus.getExitStatus(),
groupId);
         ev.setReason(containerStatus.getDiagnostics());
         dnmgr.recordEventAsync(ev);
       }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ba60844/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 510a146..8d2406f 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -69,6 +69,8 @@ import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
 import org.apache.apex.engine.plugin.ApexPluginDispatcher;
 import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher;
 import org.apache.apex.engine.util.CascadeStorageAgent;
+import org.apache.apex.stram.GroupingManager;
+import org.apache.apex.stram.GroupingRequest.EventGroupId;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -260,6 +262,7 @@ public class StreamingContainerManager implements PlanContext
   //logical operator name to latest counters. exists for backward compatibility.
   private final Map<String, Object> latestLogicalCounters = Maps.newHashMap();
   public transient ApexPluginDispatcher apexPluginDispatcher = new NoOpApexPluginDispatcher();
+  private final GroupingManager groupingManager = GroupingManager.getGroupingManagerInstance();
 
   private final LinkedHashMap<String, ContainerInfo> completedContainers = new LinkedHashMap<String,
ContainerInfo>()
   {
@@ -799,7 +802,7 @@ public class StreamingContainerManager implements PlanContext
               if (sca.lastHeartbeatMillis != -1) {
                 String msg = String.format("Container %s@%s heartbeat timeout  (%d%n ms).",
c.getExternalId(), c.host, currentTms - sca.lastHeartbeatMillis);
                 LOG.warn(msg);
-                StramEvent stramEvent = new StramEvent.ContainerErrorEvent(c.getExternalId(),
msg, null);
+                StramEvent stramEvent = new StramEvent.ContainerErrorEvent(c.getExternalId(),
msg, null, null);
                 stramEvent.setReason(msg);
                 recordEventAsync(stramEvent);
                 sca.lastHeartbeatMillis = -1;
@@ -1163,6 +1166,9 @@ public class StreamingContainerManager implements PlanContext
     }
     includeLocalUpstreamOperators(ctx);
 
+    groupingManager.addOrModifyGroupingRequest(containerId, ctx.visited);
+    groupingManager.removeProcessedGroupingRequests();
+
     // redeploy cycle for all affected operators
     LOG.info("Affected operators {}", ctx.visited);
     deploy(Collections.<PTContainer>emptySet(), ctx.visited, Sets.newHashSet(cs.container),
ctx.visited);
@@ -1204,7 +1210,7 @@ public class StreamingContainerManager implements PlanContext
     if (containerAgent != null) {
       // record operator stop for this container
       for (PTOperator oper : containerAgent.container.getOperators()) {
-        StramEvent ev = new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), containerId);
+        StramEvent ev = new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), containerId,
groupingManager.getEventGroupIdForContainer(containerId));
         recordEventAsync(ev);
       }
       containerAgent.container.setFinishedTime(System.currentTimeMillis());
@@ -1279,6 +1285,8 @@ public class StreamingContainerManager implements PlanContext
       return null;
     }
 
+    groupingManager.addNewContainerToGroupingRequest(container.getExternalId(), resource.containerId);
+
     pendingAllocation.remove(container);
     container.setState(PTContainer.State.ALLOCATED);
     if (container.getExternalId() != null) {
@@ -1381,13 +1389,16 @@ public class StreamingContainerManager implements PlanContext
               sca.undeployOpers.add(oper.getId());
               slowestUpstreamOp.remove(oper);
               // record operator stop event
-              recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(),
oper.getContainer().getExternalId()));
+              recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(),
oper.getContainer().getExternalId(), null));
               break;
             case FAILED:
               processOperatorFailure(oper);
               sca.undeployOpers.add(oper.getId());
               slowestUpstreamOp.remove(oper);
-              recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(),
oper.getContainer().getExternalId()));
+
+              EventGroupId groupId = groupingManager.getEventGroupIdForContainer(oper.getContainer().getExternalId());
+              recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(),
+                  oper.getContainer().getExternalId(), groupId));
               break;
             case ACTIVE:
             default:
@@ -1397,8 +1408,9 @@ public class StreamingContainerManager implements PlanContext
         break;
       case PENDING_UNDEPLOY:
         if (ds == null) {
+          EventGroupId groupId = groupingManager.moveOperatorFromUndeployListToDeployList(oper);
           // operator no longer deployed in container
-          recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(),
oper.getContainer().getExternalId()));
+          recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(),
oper.getContainer().getExternalId(), groupId));
           oper.setState(State.PENDING_DEPLOY);
           sca.deployOpers.add(oper);
         } else {
@@ -1418,7 +1430,9 @@ public class StreamingContainerManager implements PlanContext
           oper.setState(PTOperator.State.ACTIVE);
           oper.stats.lastHeartbeat = null; // reset on redeploy
           oper.stats.lastWindowIdChangeTms = clock.getTime();
-          recordEventAsync(new StramEvent.StartOperatorEvent(oper.getName(), oper.getId(),
container.getExternalId()));
+          EventGroupId groupId = groupingManager.getEventGroupIdForOperatorToDeploy(oper.getId());
+          recordEventAsync(new StramEvent.StartOperatorEvent(oper.getName(), oper.getId(),
container.getExternalId(),  groupId));
+          groupingManager.removeOperatorFromGroupingRequest(oper.getId());
         }
         break;
       default:
@@ -1427,7 +1441,7 @@ public class StreamingContainerManager implements PlanContext
           // operator was removed and needs to be undeployed from container
           sca.undeployOpers.add(oper.getId());
           slowestUpstreamOp.remove(oper);
-          recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(),
oper.getContainer().getExternalId()));
+          recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(),
oper.getContainer().getExternalId(), null));
         }
     }
   }
@@ -2421,6 +2435,7 @@ public class StreamingContainerManager implements PlanContext
           // operator will be deployed after it has been undeployed, if still referenced
by the container
           if (oper.getState() != PTOperator.State.PENDING_UNDEPLOY) {
             oper.setState(PTOperator.State.PENDING_DEPLOY);
+            groupingManager.addOperatorToDeploy(oper.getContainer().getExternalId(), oper);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ba60844/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
index 76f89bd..8401931 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
@@ -20,12 +20,15 @@ package com.datatorrent.stram;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Collections;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.apex.log.LogFileInformation;
-
+import org.apache.apex.stram.GroupingManager;
+import org.apache.apex.stram.GroupingRequest;
+import org.apache.apex.stram.GroupingRequest.EventGroupId;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ipc.ProtocolSignature;
@@ -176,20 +179,29 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit
   @Override
   public void reportError(String containerId, int[] operators, String msg, LogFileInformation
logFileInfo) throws IOException
   {
+    EventGroupId groupId = getGroupIdForNewGroupingRequest(containerId);
     if (operators == null || operators.length == 0) {
-      dagManager.recordEventAsync(new ContainerErrorEvent(containerId, msg, logFileInfo));
+      dagManager.recordEventAsync(new ContainerErrorEvent(containerId, msg, logFileInfo,
groupId));
     } else {
       for (int operator : operators) {
         OperatorInfo operatorInfo = dagManager.getOperatorInfo(operator);
         if (operatorInfo != null) {
-          dagManager.recordEventAsync(new OperatorErrorEvent(operatorInfo.name, operator,
containerId, msg,
-              logFileInfo));
+          dagManager.recordEventAsync(
+              new OperatorErrorEvent(operatorInfo.name, operator, containerId, msg, logFileInfo,
groupId));
         }
       }
     }
     log(containerId, msg);
   }
 
+  //create new group the deploy request, request data will be populated when sub-dag restart
happens
+  private EventGroupId getGroupIdForNewGroupingRequest(String containerId)
+  {
+    GroupingManager groupingManager = GroupingManager.getGroupingManagerInstance();
+    GroupingRequest groupingRequest = groupingManager.addOrModifyGroupingRequest(containerId,
Collections.EMPTY_SET);
+    return groupingRequest.getEventGroupId();
+  }
+
   @Override
   public StreamingContainerContext getInitContext(String containerId)
       throws IOException

http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ba60844/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java b/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
index 8af90bc..6224856 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
@@ -21,6 +21,7 @@ package com.datatorrent.stram.api;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.apex.log.LogFileInformation;
+import org.apache.apex.stram.GroupingRequest.EventGroupId;
 
 import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
 
@@ -38,6 +39,7 @@ public abstract class StramEvent
   private String reason;
   private LogLevel logLevel;
   private LogFileInformation logFileInformation;
+  private EventGroupId groupId;
 
   public abstract String getType();
 
@@ -48,9 +50,15 @@ public abstract class StramEvent
 
   protected StramEvent(LogLevel logLevel, LogFileInformation logFileInformation)
   {
+    this(logLevel, logFileInformation, null);
+  }
+
+  protected StramEvent(LogLevel logLevel, LogFileInformation logFileInformation, EventGroupId
groupId)
+  {
     id = nextId.getAndIncrement();
     this.logLevel = logLevel;
     this.logFileInformation = logFileInformation;
+    this.groupId = groupId;
   }
 
   public long getId()
@@ -98,6 +106,16 @@ public abstract class StramEvent
     this.logFileInformation = logFileInformation;
   }
 
+  public EventGroupId getGroupId()
+  {
+    return groupId;
+  }
+
+  public void setGroupId(EventGroupId groupId)
+  {
+    this.groupId = groupId;
+  }
+
   public static enum LogLevel
   {
     TRACE,
@@ -114,12 +132,12 @@ public abstract class StramEvent
 
     public OperatorEvent(String operatorName, LogLevel logLevel)
     {
-      this(operatorName, logLevel, null);
+      this(operatorName, logLevel, null, null);
     }
 
-    public OperatorEvent(String operatorName, LogLevel logLevel, LogFileInformation logFileInformation)
+    public OperatorEvent(String operatorName, LogLevel logLevel, LogFileInformation logFileInformation,
EventGroupId groupId)
     {
-      super(logLevel, logFileInformation);
+      super(logLevel, logFileInformation, groupId);
       this.operatorName = operatorName;
     }
 
@@ -231,13 +249,18 @@ public abstract class StramEvent
 
     public PhysicalOperatorEvent(String operatorName, int operatorId, LogLevel logLevel)
     {
-      this(operatorName, operatorId, logLevel, null);
+      this(operatorName, operatorId, logLevel, null, null);
+    }
+
+    public PhysicalOperatorEvent(String operatorName, int operatorId, LogLevel logLevel,
EventGroupId groupId)
+    {
+      this(operatorName, operatorId, logLevel, null, groupId);
     }
 
     public PhysicalOperatorEvent(String operatorName, int operatorId, LogLevel logLevel,
-        LogFileInformation logFileInformation)
+        LogFileInformation logFileInformation, EventGroupId groupId)
     {
-      super(operatorName, logLevel, logFileInformation);
+      super(operatorName, logLevel, logFileInformation, groupId);
       this.operatorId = operatorId;
     }
 
@@ -292,14 +315,14 @@ public abstract class StramEvent
   {
     private String containerId;
 
-    public StartOperatorEvent(String operatorName, int operatorId, String containerId)
+    public StartOperatorEvent(String operatorName, int operatorId, String containerId, EventGroupId
groupId)
     {
-      this(operatorName, operatorId, containerId, LogLevel.INFO);
+      this(operatorName, operatorId, containerId, LogLevel.INFO, groupId);
     }
 
-    public StartOperatorEvent(String operatorName, int operatorId, String containerId, LogLevel
logLevel)
+    public StartOperatorEvent(String operatorName, int operatorId, String containerId, LogLevel
logLevel, EventGroupId groupId)
     {
-      super(operatorName, operatorId, logLevel);
+      super(operatorName, operatorId, logLevel, groupId);
       this.containerId = containerId;
     }
 
@@ -325,14 +348,14 @@ public abstract class StramEvent
   {
     private String containerId;
 
-    public StopOperatorEvent(String operatorName, int operatorId, String containerId)
+    public StopOperatorEvent(String operatorName, int operatorId, String containerId, EventGroupId
groupId)
     {
-      this(operatorName, operatorId, containerId, LogLevel.WARN);
+      this(operatorName, operatorId, containerId, LogLevel.WARN, groupId);
     }
 
-    public StopOperatorEvent(String operatorName, int operatorId, String containerId, LogLevel
logLevel)
+    public StopOperatorEvent(String operatorName, int operatorId, String containerId, LogLevel
logLevel,  EventGroupId groupId)
     {
-      super(operatorName, operatorId, logLevel);
+      super(operatorName, operatorId, logLevel, groupId);
       this.containerId = containerId;
     }
 
@@ -404,14 +427,14 @@ public abstract class StramEvent
     String containerId;
     String containerNodeId;
 
-    public StartContainerEvent(String containerId, String containerNodeId)
+    public StartContainerEvent(String containerId, String containerNodeId, EventGroupId groupId)
     {
-      this(containerId, containerNodeId, LogLevel.INFO);
+      this(containerId, containerNodeId, LogLevel.INFO, groupId);
     }
 
-    public StartContainerEvent(String containerId, String containerNodeId, LogLevel logLevel)
+    public StartContainerEvent(String containerId, String containerNodeId, LogLevel logLevel,
EventGroupId groupId)
     {
-      super(logLevel);
+      super(logLevel, null, groupId);
       this.containerId = containerId;
       this.containerNodeId = containerNodeId;
     }
@@ -449,14 +472,14 @@ public abstract class StramEvent
     String containerId;
     int exitStatus;
 
-    public StopContainerEvent(String containerId, int exitStatus)
+    public StopContainerEvent(String containerId, int exitStatus, EventGroupId groupId)
     {
-      this(containerId, exitStatus, LogLevel.WARN);
+      this(containerId, exitStatus, LogLevel.WARN, groupId);
     }
 
-    public StopContainerEvent(String containerId, int exitStatus, LogLevel logLevel)
+    public StopContainerEvent(String containerId, int exitStatus, LogLevel logLevel, EventGroupId
groupId)
     {
-      super(logLevel);
+      super(logLevel, null, groupId);
       this.containerId = containerId;
       this.exitStatus = exitStatus;
     }
@@ -528,15 +551,15 @@ public abstract class StramEvent
     private String errorMessage;
 
     public OperatorErrorEvent(String operatorName, int operatorId, String containerId, String
errorMessage,
-        LogFileInformation logFileInformation)
+        LogFileInformation logFileInformation, EventGroupId groupId)
     {
-      this(operatorName, operatorId, containerId, errorMessage, logFileInformation, LogLevel.ERROR);
+      this(operatorName, operatorId, containerId, errorMessage, logFileInformation, groupId,
LogLevel.ERROR);
     }
 
     public OperatorErrorEvent(String operatorName, int operatorId, String containerId, String
errorMessage,
-        LogFileInformation logFileInformation, LogLevel logLevel)
+        LogFileInformation logFileInformation, EventGroupId groupId, LogLevel logLevel)
     {
-      super(operatorName, operatorId, logLevel, logFileInformation);
+      super(operatorName, operatorId, logLevel, logFileInformation, groupId);
       this.containerId = containerId;
       this.errorMessage = errorMessage;
     }
@@ -574,15 +597,15 @@ public abstract class StramEvent
     private String containerId;
     private String errorMessage;
 
-    public ContainerErrorEvent(String containerId, String errorMessage, LogFileInformation
logFileInformation)
+    public ContainerErrorEvent(String containerId, String errorMessage, LogFileInformation
logFileInformation, EventGroupId groupId)
     {
-      this(containerId, errorMessage, logFileInformation, LogLevel.ERROR);
+      this(containerId, errorMessage, logFileInformation, groupId, LogLevel.ERROR);
     }
 
-    public ContainerErrorEvent(String containerId, String errorMessage, LogFileInformation
logFileInformation,
+    public ContainerErrorEvent(String containerId, String errorMessage, LogFileInformation
logFileInformation, EventGroupId groupId,
         LogLevel logLevel)
     {
-      super(logLevel, logFileInformation);
+      super(logLevel, logFileInformation, groupId);
       this.containerId = containerId;
       this.errorMessage = errorMessage;
     }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ba60844/engine/src/main/java/org/apache/apex/stram/GroupingManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/stram/GroupingManager.java b/engine/src/main/java/org/apache/apex/stram/GroupingManager.java
new file mode 100644
index 0000000..b160dd6
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/stram/GroupingManager.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.stram;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.stram.GroupingRequest.EventGroupId;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.stram.plan.physical.PTOperator;
+
+/**
+ * This class manages tracking ids of deploy/undeploy for containers and
+ * operators.
+ *
+ */
+public class GroupingManager
+{
+  private static final GroupingManager groupingManager = new GroupingManager();
+  private Map<String, GroupingRequest> groupingRequests = Maps.newHashMap();
+
+  public static GroupingManager getGroupingManagerInstance()
+  {
+    return groupingManager;
+  }
+
+  /**
+   * Retruns all available grouping requests with StrAM
+   * @return groupingRequests
+   */
+  public Map<String, GroupingRequest> getGroupingRequests()
+  {
+    return groupingRequests;
+  }
+
+  /**
+   * Returns grouping request for container
+   * @param containerId
+   * @return groupingRequest
+   */
+  public GroupingRequest getGroupingRequest(String containerId)
+  {
+    return groupingRequests.get(containerId);
+  }
+
+  /**
+   * Returns deploy/undeploy group Id for container
+   * @param containerId
+   * @return groupId <br/>
+   *         <b>Note:</b> groupId 0 indicates and independent event, with no
+   *         group
+   */
+  public EventGroupId getEventGroupIdForContainer(String containerId)
+  {
+    EventGroupId groupId = null;
+    if (groupingRequests.get(containerId) != null) {
+      groupId = groupingRequests.get(containerId).getEventGroupId();
+    }
+    return groupId;
+  }
+
+  /**
+   * Returns deploy/undeploy group Id for container This could be a new
+   * container allocated during redeploy process
+   * @param containerId
+   * @return groupId <br/>
+   *         <b>Note:</b> groupId 0 indicates and indipendent event, with no
+   *         group
+   */
+  public EventGroupId getEventGroupIdForAffectedContainer(String containerId)
+  {
+    EventGroupId groupId = getEventGroupIdForContainer(containerId);
+    if (groupId != null) {
+      return groupId;
+    }
+    for (GroupingRequest request : getGroupingRequests().values()) {
+      if (request.getAffectedContainers().contains(containerId)) {
+        groupId = request.getEventGroupId();
+      }
+    }
+    return groupId;
+  }
+
+  /**
+   * Returns grouping groupId for operator which is to undergo deploy. Operators
+   * undergoing deploy for first time will have groupId as 0
+   * @param operatorId
+   * @return groupId <br/>
+   *         <b>Note:</b> groupId 0 indicates and indipendent event, with no
+   *         group
+   */
+  public EventGroupId getEventGroupIdForOperatorToDeploy(int operatorId)
+  {
+    for (GroupingRequest request : getGroupingRequests().values()) {
+      if (request.getOperatorsToDeploy().contains(operatorId)) {
+        return request.getEventGroupId();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Adds operator to deploy. The operator is added to request associated with containerId
+   * @param containerIs
+   * @param operator
+   */
+  public void addOperatorToDeploy(String containerId, PTOperator oper)
+  {
+    GroupingRequest request = getGroupingRequest(containerId);
+    if (request != null) {
+      request.addOperatorToDeploy(oper.getId());
+    }
+  }
+
+  /**
+   * Removes operator from grouping request
+   */
+  public boolean removeOperatorFromGroupingRequest(int operatorId)
+  {
+    for (GroupingRequest request : getGroupingRequests().values()) {
+      if (request.getOperatorsToDeploy().contains((operatorId))) {
+        return request.removeOperatorToDeploy(operatorId);
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Remove groupingRequest from StrAM if it has no more pending operators to deploy
+   * @param containerId
+   */
+  public void removeProcessedGroupingRequests()
+  {
+    for (Entry<String, GroupingRequest> request : groupingRequests.entrySet()) {
+      if (request.getValue().getOperatorsToDeploy().size() == 0
+          && request.getValue().getOperatorsToUndeploy().size() == 0) {
+        LOG.info("Removing for :" + request.getKey());
+        groupingRequests.remove(request.getKey());
+      }
+    }
+
+  }
+
+  /**
+   * Create groupingRequest to group deploy/undeploy of related container/operator
+   * events under one groupId to find related events.
+   * To start will all related operators are added to opertorsToUndeploy list,
+   * they will eventually move to operatorsToDeploy when operator undergo redeploy cycle.
+   * @param containerId
+   * @param affectedOperators
+   */
+  public GroupingRequest addOrModifyGroupingRequest(String containerId, Set<PTOperator>
affectedOperators)
+  {
+    GroupingRequest request = groupingRequests.get(containerId);
+    if (request == null) {
+      request = new GroupingRequest();
+      groupingRequests.put(containerId, request);
+    }
+    for (PTOperator oper : affectedOperators) {
+      request.addOperatorToUndeploy(oper.getId());
+      request.addAffectedContainer(oper.getContainer().getExternalId());
+    }
+    return request;
+  }
+
+  /**
+   * Add affectedContainerId to deploy request, if container is deployed as part
+   * of redeploy process of groupLeaderContainer
+   * @param groupLeaderContainerId
+   * @param affectedContainerId
+   */
+  public void addNewContainerToGroupingRequest(String groupLeaderContainerId, String affectedContainerId)
+  {
+    if (groupLeaderContainerId != null && affectedContainerId != null) {
+      GroupingRequest request = getGroupingRequest(groupLeaderContainerId);
+      if (request != null) {
+        request.addAffectedContainer(affectedContainerId);
+      }
+    }
+  }
+
+  /**
+   * When operator state changes from PENDING_UNDEPLOY to PENDING_DEPLOY move
+   * operator from operatorsToUndeploy to operatorsToDeploy
+   * @param operator
+   * @return groupId
+   */
+  public EventGroupId moveOperatorFromUndeployListToDeployList(PTOperator oper)
+  {
+    EventGroupId groupId = null;
+    for (GroupingRequest request : groupingRequests.values()) {
+      if (request.getOperatorsToUndeploy().contains(oper.getId())) {
+        groupId = request.getEventGroupId();
+        request.removeOperatorToUndeploy(oper.getId());
+        request.addOperatorToDeploy(oper.getId());
+      }
+    }
+    return groupId;
+  }
+
+  /**
+   * Clear all grouping requests
+   */
+  public void clearAllGroupingRequests()
+  {
+    groupingRequests.clear();
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(GroupingManager.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ba60844/engine/src/main/java/org/apache/apex/stram/GroupingRequest.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/stram/GroupingRequest.java b/engine/src/main/java/org/apache/apex/stram/GroupingRequest.java
new file mode 100644
index 0000000..d107a38
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/stram/GroupingRequest.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.stram;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Sets;
+
+import com.datatorrent.stram.util.AbstractWritableAdapter;
+
+/**
+ * Grouping request keeps track of operators whose start/stop events should be grouped.
+ */
+public class GroupingRequest
+{
+  private EventGroupId eventGroupId;
+  private Set<Integer> operatorsToDeploy = Sets.newHashSet();
+  private Set<Integer> operatorsToUndeploy = Sets.newHashSet();
+  private Set<String> affectedContainers = Sets.newHashSet();
+
+  public GroupingRequest()
+  {
+    eventGroupId = EventGroupId.newEventGroupId();
+  }
+
+  public GroupingRequest(EventGroupId groupId)
+  {
+    this.eventGroupId = groupId;
+  }
+
+  /**
+   * Gets EventGroupId
+   * @return eventGroupId
+   */
+  public EventGroupId getEventGroupId()
+  {
+    return eventGroupId;
+  }
+
+  /**
+   * Gets operators to deploy as part of deploy request
+   * @return operatorsToDeploy
+   */
+  public Set<Integer> getOperatorsToDeploy()
+  {
+    return operatorsToDeploy;
+  }
+
+  /**
+   * Gets operators to undeploy as part of deploy request
+   * @return operatorsToUndeploy
+   */
+  public Set<Integer> getOperatorsToUndeploy()
+  {
+    return operatorsToUndeploy;
+  }
+
+  /**
+   * Gets containers affected by deploy request
+   * @return affectedContainers
+   */
+  public Set<String> getAffectedContainers()
+  {
+    return affectedContainers;
+  }
+
+  /**
+   * Adds operator to deploy request's list of operators to deploy
+   * @param operatorId
+   */
+  public void addOperatorToDeploy(int operatorId)
+  {
+    operatorsToDeploy.add(operatorId);
+  }
+
+  /**
+   * Removes operator from deploy request's list of operators to deploy
+   * @param operatorId
+   * @return ifRemoved
+   */
+  public boolean removeOperatorToDeploy(int operatorId)
+  {
+    return operatorsToDeploy.remove(operatorId);
+  }
+
+  /**
+   * Adds operator to deploy request's list of operators to undeploy
+   * @param operatorId
+   */
+  public void addOperatorToUndeploy(int operatorId)
+  {
+    operatorsToUndeploy.add(operatorId);
+  }
+
+  /**
+   * Removes operator from deploy request's list of operators to undeploy
+   * @param operatorId
+   * @return ifRemoved
+   */
+  public boolean removeOperatorToUndeploy(int operatorId)
+  {
+    return operatorsToUndeploy.remove(operatorId);
+  }
+
+  /**
+   * Adds container to deploy request's list of affected containers.
+   * @param containerId
+   */
+  public void addAffectedContainer(String containerId)
+  {
+    affectedContainers.add(containerId);
+  }
+
+  /**
+   * EventGroupId is used to club relevant events. Events triggered by common
+   * cause are considered as relevant events.
+   *
+   */
+  public static class EventGroupId extends AbstractWritableAdapter
+  {
+    private static final long serialVersionUID = 1L;
+    private static final AtomicInteger idSequence = new AtomicInteger();
+    private int groupId;
+
+    public static EventGroupId newEventGroupId()
+    {
+      EventGroupId id = new EventGroupId();
+      id.groupId = idSequence.incrementAndGet();
+      return id;
+    }
+
+    @Override
+    public int hashCode()
+    {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + groupId;
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      EventGroupId other = (EventGroupId)obj;
+      if (groupId != other.groupId) {
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "EventGroupId [groupId=" + groupId + "]";
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ba60844/engine/src/test/java/org/apach/apex/stram/GroupingManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/org/apach/apex/stram/GroupingManagerTest.java b/engine/src/test/java/org/apach/apex/stram/GroupingManagerTest.java
new file mode 100644
index 0000000..7e33e97
--- /dev/null
+++ b/engine/src/test/java/org/apach/apex/stram/GroupingManagerTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apach.apex.stram;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import org.apache.apex.stram.GroupingManager;
+import org.apache.apex.stram.GroupingRequest;
+
+import com.google.common.collect.ImmutableSet;
+
+import com.datatorrent.stram.plan.physical.PTContainer;
+import com.datatorrent.stram.plan.physical.PTOperator;
+
+import static org.mockito.Mockito.when;
+
+public class GroupingManagerTest
+{
+
+  @Mock
+  private PTOperator oper1;
+  @Mock
+  private PTOperator oper2;
+  @Mock
+  private PTContainer testContainer;
+  private String affectedContainerId = "container_4";
+  private GroupingManager underTest;
+
+  @Before
+  public void setup()
+  {
+    underTest = GroupingManager.getGroupingManagerInstance();
+    MockitoAnnotations.initMocks(this);
+
+    when(oper1.getId()).thenReturn(1);
+    when(oper2.getId()).thenReturn(2);
+    when(oper1.getContainer()).thenReturn(testContainer);
+    when(oper2.getContainer()).thenReturn(testContainer);
+    when(testContainer.getExternalId()).thenReturn(affectedContainerId);
+  }
+
+  @Test
+  public void testAddNewDeploy()
+  {
+    String failedContainerId = "container_1";
+    underTest.addOrModifyGroupingRequest(failedContainerId, ImmutableSet.of(oper1, oper2));
+    Assert.assertEquals(1, underTest.getGroupingRequests().size());
+    GroupingRequest request = underTest.getGroupingRequest(failedContainerId);
+    Assert.assertTrue(request.getAffectedContainers().contains(affectedContainerId));
+    Assert.assertTrue(request.getOperatorsToUndeploy().contains(oper1.getId()));
+    Assert.assertTrue(request.getOperatorsToUndeploy().contains(oper2.getId()));
+  }
+
+  @Test
+  public void testAddOperatorToGroupingRequest()
+  {
+    String failedContainerId = "container_1";
+    underTest.addOrModifyGroupingRequest(failedContainerId, ImmutableSet.of(oper1));
+    GroupingRequest request = underTest.getGroupingRequest(failedContainerId);
+    Assert.assertFalse(request.getOperatorsToDeploy().contains(oper2.getId()));
+    underTest.addOperatorToDeploy(failedContainerId, oper2);
+    Assert.assertTrue(request.getOperatorsToDeploy().contains(oper2.getId()));
+  }
+
+  @Test
+  public void testGetDeployGroupIdForContainer()
+  {
+    String failedContainerId = "container_1";
+    underTest.addOrModifyGroupingRequest(failedContainerId, ImmutableSet.of(oper1));
+    GroupingRequest request = underTest.getGroupingRequest(failedContainerId);
+
+    Assert.assertEquals(request.getEventGroupId(), underTest.getEventGroupIdForContainer(failedContainerId));
+  }
+
+  @Test
+  public void testGetDeployGroupIdForOperator()
+  {
+    String failedContainerId = "container_1";
+    underTest.addOrModifyGroupingRequest(failedContainerId, ImmutableSet.of(oper1));
+    underTest.addOperatorToDeploy(failedContainerId, oper1); //consider operator moved from
PENDING_UNDEPLOY to DENDING_DEPLOY state
+    GroupingRequest request = underTest.getGroupingRequest(failedContainerId);
+
+    Assert.assertEquals(request.getEventGroupId(), underTest.getEventGroupIdForOperatorToDeploy(oper1.getId()));
+  }
+
+  @Test
+  public void testMoveOperatorFromUndeployListToDeployList()
+  {
+    String failedContainerId = "container_1";
+    underTest.addOrModifyGroupingRequest(failedContainerId, ImmutableSet.of(oper1));
+    underTest.moveOperatorFromUndeployListToDeployList(oper1);
+    GroupingRequest request = underTest.getGroupingRequest(failedContainerId);
+
+    Assert.assertFalse(request.getOperatorsToUndeploy().contains(oper1.getId()));
+    Assert.assertTrue(request.getOperatorsToDeploy().contains(oper1.getId()));
+  }
+
+  @Test
+  public void testAddNewContainerToGroupingRequest()
+  {
+    String groupLeaderContainerId = "container_1";
+    String newAffectedContainerId = "container_11";
+    underTest.addOrModifyGroupingRequest(groupLeaderContainerId, ImmutableSet.of(oper1));
+    underTest.addNewContainerToGroupingRequest(groupLeaderContainerId, newAffectedContainerId);
+
+    GroupingRequest request = underTest.getGroupingRequest(groupLeaderContainerId);
+    Assert.assertTrue(request.getAffectedContainers().contains(newAffectedContainerId));
+  }
+
+  @Test
+  public void testRemoveProcessedGroupingRequest()
+  {
+    underTest.addOrModifyGroupingRequest(affectedContainerId, ImmutableSet.of(oper1));
+    Assert.assertEquals(1, underTest.getGroupingRequests().size());
+    underTest.moveOperatorFromUndeployListToDeployList(oper1); //move from updeploy to deploy
list
+    underTest.removeOperatorFromGroupingRequest(oper1.getId());
+    underTest.removeProcessedGroupingRequests();
+    Assert.assertEquals(0, underTest.getGroupingRequests().size());
+
+  }
+
+  @After
+  public void teardown()
+  {
+    underTest.clearAllGroupingRequests();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ba60844/engine/src/test/java/org/apach/apex/stram/GroupingRequestTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/org/apach/apex/stram/GroupingRequestTest.java b/engine/src/test/java/org/apach/apex/stram/GroupingRequestTest.java
new file mode 100644
index 0000000..3417715
--- /dev/null
+++ b/engine/src/test/java/org/apach/apex/stram/GroupingRequestTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apach.apex.stram;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.apex.stram.GroupingRequest;
+
+public class GroupingRequestTest
+{
+  private GroupingRequest underTest;
+
+  @Before
+  public void setup()
+  {
+    underTest = new GroupingRequest();
+  }
+
+  @Test
+  public void testAddAffectedContainer()
+  {
+    String affectedContainerId = "container_000001";
+    underTest.addAffectedContainer(affectedContainerId);
+    Assert.assertTrue(underTest.getAffectedContainers().contains(affectedContainerId));
+  }
+
+  @Test
+  public void testAddOperatorToUndeploy()
+  {
+    int operatorId = 1;
+    underTest.addOperatorToUndeploy(operatorId);
+    Assert.assertTrue(underTest.getOperatorsToUndeploy().contains(operatorId));
+  }
+
+  @Test
+  public void testAddOperatorToDeploy()
+  {
+    int operatorId = 1;
+    underTest.addOperatorToDeploy(operatorId);
+    Assert.assertTrue(underTest.getOperatorsToDeploy().contains(operatorId));
+  }
+
+  @Test
+  public void testRemoveOperatorToUndeploy()
+  {
+    int operatorId = 1;
+    underTest.addOperatorToUndeploy(operatorId);
+    Assert.assertTrue(underTest.getOperatorsToUndeploy().contains(operatorId));
+    underTest.removeOperatorToUndeploy(operatorId);
+    Assert.assertFalse(underTest.getOperatorsToUndeploy().contains(operatorId));
+  }
+
+  @Test
+  public void testRemoveOperatorToDeploy()
+  {
+    int operatorId = 1;
+    underTest.addOperatorToDeploy(operatorId);
+    Assert.assertTrue(underTest.getOperatorsToDeploy().contains(operatorId));
+    underTest.removeOperatorToDeploy(operatorId);
+    Assert.assertFalse(underTest.getOperatorsToDeploy().contains(operatorId));
+  }
+
+}


Mime
View raw message