apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [15/16] incubator-apex-core git commit: APEXCORE-423 style fix for apex-core engine
Date Sat, 09 Apr 2016 03:52:33 GMT
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/StramUtils.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramUtils.java b/engine/src/main/java/com/datatorrent/stram/StramUtils.java
index c25480e..8b413bc 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramUtils.java
@@ -40,8 +40,7 @@ public abstract class StramUtils
     try {
       //return Class.forName(className).asSubclass(superClass);
       return Thread.currentThread().getContextClassLoader().loadClass(className).asSubclass(superClass);
-    }
-    catch (ClassNotFoundException e) {
+    } catch (ClassNotFoundException e) {
       throw new IllegalArgumentException("Class not found: " + className, e);
     }
   }
@@ -50,16 +49,14 @@ public abstract class StramUtils
   {
     try {
       return clazz.newInstance();
-    }
-    catch (IllegalAccessException e) {
+    } catch (IllegalAccessException e) {
       throw new IllegalArgumentException("Failed to instantiate " + clazz, e);
-    }
-    catch (InstantiationException e) {
+    } catch (InstantiationException e) {
       throw new IllegalArgumentException("Failed to instantiate " + clazz, e);
     }
   }
 
-  public static abstract class YarnContainerMain
+  public abstract static class YarnContainerMain
   {
     static {
       // set system properties so they can be used in logger configuration
@@ -72,7 +69,7 @@ public abstract class StramUtils
       System.setProperty("hadoop.log.file", "dt.log");
       if (envs.get("CDH_YARN_HOME") != null) {
         // map logging properties to what CHD expects out of the box
-        String[] keys = new String[] { "log.dir", "log.file", "root.logger" };
+        String[] keys = new String[]{"log.dir", "log.file", "root.logger"};
         for (String key : keys) {
           String v = System.getProperty("hadoop." + key);
           if (v != null) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/StreamingAppMaster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMaster.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMaster.java
index e060acf..7598b4f 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMaster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMaster.java
@@ -21,6 +21,9 @@ package com.datatorrent.stram;
 import java.io.StringWriter;
 import java.util.Map;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -32,8 +35,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.datatorrent.stram.debug.StdOutErrLog;
 import com.datatorrent.stram.util.VersionInfo;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 075836a..1c7c893 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -123,7 +123,7 @@ public class StreamingAppMasterService extends CompositeService
   private static final long DELEGATION_TOKEN_MAX_LIFETIME = Long.MAX_VALUE / 2;
   private static final long DELEGATION_TOKEN_RENEW_INTERVAL = Long.MAX_VALUE / 2;
   private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = 24 * 60 * 60 * 1000;
-  private static final int UPDATE_NODE_REPORTS_INTERVAL = 10* 60 * 1000;
+  private static final int UPDATE_NODE_REPORTS_INTERVAL = 10 * 60 * 1000;
   private AMRMClient<ContainerRequest> amRmClient;
   private NMClientAsync nmClient;
   private LogicalPlan dag;
@@ -145,7 +145,7 @@ public class StreamingAppMasterService extends CompositeService
   private final Map<String, NodeFailureStats> failedContainerNodesMap = Maps.newHashMap();
   // Count of failed containers
   private final AtomicInteger numFailedContainers = new AtomicInteger();
-  private final ConcurrentLinkedQueue<Runnable> pendingTasks = new ConcurrentLinkedQueue<Runnable>();
+  private final ConcurrentLinkedQueue<Runnable> pendingTasks = new ConcurrentLinkedQueue<>();
   // child container callback
   private StreamingContainerParent heartbeatListener;
   private StreamingContainerManager dnmgr;
@@ -600,9 +600,9 @@ public class StreamingAppMasterService extends CompositeService
     // start web service
     try {
       org.mortbay.log.Log.setLog(null);
-    }
-    catch (Throwable throwable) {
-      // SPOI-2687. As part of Pivotal Certification, we need to catch ClassNotFoundException as Pivotal was using Jetty 7 where as other distros are using Jetty 6.
+    } catch (Throwable throwable) {
+      // SPOI-2687. As part of Pivotal Certification, we need to catch ClassNotFoundException as Pivotal was using
+      // Jetty 7 where as other distros are using Jetty 6.
       // LOG.error("can't set the log to null: ", throwable);
     }
 
@@ -616,8 +616,7 @@ public class StreamingAppMasterService extends CompositeService
       LOG.info("Started web service at port: " + webApp.port());
       this.appMasterTrackingUrl = NetUtils.getConnectAddress(webApp.getListenerAddress()).getHostName() + ":" + webApp.port();
       LOG.info("Setting tracking URL to: " + appMasterTrackingUrl);
-    }
-    catch (Exception e) {
+    } catch (Exception e) {
       LOG.error("Webapps failed to start. Ignoring for now:", e);
     }
   }
@@ -646,8 +645,7 @@ public class StreamingAppMasterService extends CompositeService
     try {
       StreamingContainer.eventloop.start();
       execute();
-    }
-    finally {
+    } finally {
       StreamingContainer.eventloop.stop();
     }
     return status;
@@ -700,7 +698,7 @@ public class StreamingAppMasterService extends CompositeService
 
     int loopCounter = -1;
     long nodeReportUpdateTime = 0;
-    List<ContainerId> releasedContainers = new ArrayList<ContainerId>();
+    List<ContainerId> releasedContainers = new ArrayList<>();
     int numTotalContainers = 0;
     // keep track of already requested containers to not request them again while waiting for allocation
     int numRequestedContainers = 0;
@@ -723,7 +721,7 @@ public class StreamingAppMasterService extends CompositeService
       if (ar != null) {
         appDone = true;
         dnmgr.shutdownDiagnosticsMessage = String.format("Application master failed due to application %s with duplicate application name \"%s\" by the same user \"%s\" is already started.",
-          ar.getApplicationId().toString(), ar.getName(), ar.getUser());
+            ar.getApplicationId().toString(), ar.getName(), ar.getUser());
         LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage);
         finishApplication(FinalApplicationStatus.FAILED, numTotalContainers);
         return;
@@ -741,8 +739,8 @@ public class StreamingAppMasterService extends CompositeService
     checkContainerStatus();
     FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
     final InetSocketAddress rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
-      YarnConfiguration.DEFAULT_RM_ADDRESS,
-      YarnConfiguration.DEFAULT_RM_PORT);
+        YarnConfiguration.DEFAULT_RM_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_PORT);
 
     while (!appDone) {
       loopCounter++;
@@ -779,8 +777,8 @@ public class StreamingAppMasterService extends CompositeService
       }
 
       // Setup request to be sent to RM to allocate containers
-      List<ContainerRequest> containerRequests = new ArrayList<ContainerRequest>();
-      List<ContainerRequest> removedContainerRequests = new ArrayList<ContainerRequest>();
+      List<ContainerRequest> containerRequests = new ArrayList<>();
+      List<ContainerRequest> removedContainerRequests = new ArrayList<>();
 
       // request containers for pending deploy requests
       if (!dnmgr.containerStartRequests.isEmpty()) {
@@ -826,7 +824,7 @@ public class StreamingAppMasterService extends CompositeService
       resourceRequestor.reissueContainerRequests(amRmClient, requestedResources, loopCounter, resourceRequestor, containerRequests, removedContainerRequests);
 
      /* Remove nodes from blacklist after timeout */
-      List<String> blacklistRemovals = new ArrayList<String>();
+      List<String> blacklistRemovals = new ArrayList<>();
       for (String hostname : failedBlackListedNodes) {
         Long timeDiff = currentTimeMillis - failedContainerNodesMap.get(hostname).blackListAdditionTime;
         if (timeDiff >= blacklistRemovalTime) {
@@ -895,8 +893,7 @@ public class StreamingAppMasterService extends CompositeService
           // allocated container no longer needed, add release request
           LOG.warn("Container {} allocated but nothing to deploy, going to release this container.", allocatedContainer.getId());
           releasedContainers.add(allocatedContainer.getId());
-        }
-        else {
+        } else {
           AllocatedContainer allocatedContainerHolder = new AllocatedContainer(allocatedContainer);
           this.allocatedContainers.put(allocatedContainer.getId().toString(), allocatedContainerHolder);
           ByteBuffer tokens = null;
@@ -927,7 +924,7 @@ public class StreamingAppMasterService extends CompositeService
       // Check the completed containers
       List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
       // LOG.debug("Got response from RM for container ask, completedCnt=" + completedContainers.size());
-      List<String> blacklistAdditions = new ArrayList<String>();
+      List<String> blacklistAdditions = new ArrayList<>();
       for (ContainerStatus containerStatus : completedContainers) {
         LOG.info("Completed containerId=" + containerStatus.getContainerId() + ", state=" + containerStatus.getState() + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics=" + containerStatus.getDiagnostics());
 
@@ -983,8 +980,7 @@ public class StreamingAppMasterService extends CompositeService
           LOG.debug("Container {} failed or killed.", containerStatus.getContainerId());
           dnmgr.scheduleContainerRestart(containerStatus.getContainerId().toString());
 //          }
-        }
-        else {
+        } else {
           // container completed successfully
           numCompletedContainers.incrementAndGet();
           LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId());
@@ -1017,8 +1013,7 @@ public class StreamingAppMasterService extends CompositeService
         LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage);
         finalStatus = FinalApplicationStatus.FAILED;
         appDone = true;
-      }
-      else if (allocatedContainers.isEmpty() && numRequestedContainers == 0 && dnmgr.containerStartRequests.isEmpty()) {
+      } else if (allocatedContainers.isEmpty() && numRequestedContainers == 0 && dnmgr.containerStartRequests.isEmpty()) {
         LOG.debug("Exiting as no more containers are allocated or requested");
         finalStatus = FinalApplicationStatus.SUCCEEDED;
         appDone = true;
@@ -1058,7 +1053,7 @@ public class StreamingAppMasterService extends CompositeService
   {
     StramDelegationTokenIdentifier identifier = new StramDelegationTokenIdentifier(new Text(username), new Text(""), new Text(""));
     String service = address.getAddress().getHostAddress() + ":" + address.getPort();
-    Token<StramDelegationTokenIdentifier> stramToken = new Token<StramDelegationTokenIdentifier>(identifier, delegationTokenManager);
+    Token<StramDelegationTokenIdentifier> stramToken = new Token<>(identifier, delegationTokenManager);
     stramToken.setService(new Text(service));
     return stramToken;
   }
@@ -1198,7 +1193,7 @@ public class StreamingAppMasterService extends CompositeService
 
   private class AllocatedContainer
   {
-    final private Container container;
+    private final Container container;
     private boolean stopRequested;
     private Token<StramDelegationTokenIdentifier> delegationToken;
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
index 81dc96e..fc2fb17 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
@@ -19,17 +19,23 @@
 package com.datatorrent.stram;
 
 import java.net.InetSocketAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import com.google.common.collect.Sets;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
+import com.google.common.collect.Sets;
+
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.InputOperator;
@@ -38,7 +44,6 @@ import com.datatorrent.api.Operator.ProcessingMode;
 import com.datatorrent.api.StorageAgent;
 import com.datatorrent.api.StreamCodec;
 import com.datatorrent.api.annotation.Stateless;
-
 import com.datatorrent.stram.api.Checkpoint;
 import com.datatorrent.stram.api.OperatorDeployInfo;
 import com.datatorrent.stram.api.OperatorDeployInfo.InputDeployInfo;
@@ -67,19 +72,23 @@ import com.datatorrent.stram.webapp.ContainerInfo;
  *
  * @since 0.3.2
  */
-public class StreamingContainerAgent {
+public class StreamingContainerAgent
+{
   private static final Logger LOG = LoggerFactory.getLogger(StreamingContainerAgent.class);
 
-  public static class ContainerStartRequest {
+  public static class ContainerStartRequest
+  {
     final PTContainer container;
 
-    ContainerStartRequest(PTContainer container) {
+    ContainerStartRequest(PTContainer container)
+    {
       this.container = container;
     }
   }
 
-
-  public StreamingContainerAgent(PTContainer container, StreamingContainerContext initCtx, StreamingContainerManager dnmgr) {
+  public StreamingContainerAgent(PTContainer container, StreamingContainerContext initCtx, StreamingContainerManager
+      dnmgr)
+  {
     this.container = container;
     this.initCtx = initCtx;
     this.memoryMBFree = this.container.getAllocatedMemoryMB();
@@ -102,9 +111,10 @@ public class StreamingContainerAgent {
   long gcCollectionTime;
   final StreamingContainerManager dnmgr;
 
-  private final ConcurrentLinkedQueue<StramToNodeRequest> operatorRequests = new ConcurrentLinkedQueue<StramToNodeRequest>();
+  private final ConcurrentLinkedQueue<StramToNodeRequest> operatorRequests = new ConcurrentLinkedQueue<>();
 
-  public StreamingContainerContext getInitContext() {
+  public StreamingContainerContext getInitContext()
+  {
     return initCtx;
   }
 
@@ -113,7 +123,8 @@ public class StreamingContainerAgent {
     return container;
   }
 
-  public boolean hasPendingWork() {
+  public boolean hasPendingWork()
+  {
     for (PTOperator oper : container.getOperators()) {
       if (oper.getState() == PTOperator.State.PENDING_DEPLOY) {
         return true;
@@ -122,29 +133,33 @@ public class StreamingContainerAgent {
     return false;
   }
 
-  public void addOperatorRequest(StramToNodeRequest r) {
+  public void addOperatorRequest(StramToNodeRequest r)
+  {
     LOG.info("Adding operator request {} {}", container.getExternalId(), r);
     this.operatorRequests.add(r);
   }
 
   @SuppressWarnings("ReturnOfCollectionOrArrayField")
-  protected ConcurrentLinkedQueue<StramToNodeRequest> getOperatorRequests() {
+  protected ConcurrentLinkedQueue<StramToNodeRequest> getOperatorRequests()
+  {
     return this.operatorRequests;
   }
 
   /**
    * Create deploy info for StramChild.
+   *
    * @param operators
    * @return StreamingContainerContext
    */
-  public List<OperatorDeployInfo> getDeployInfoList(Collection<PTOperator> operators) {
+  public List<OperatorDeployInfo> getDeployInfoList(Collection<PTOperator> operators)
+  {
 
     if (container.bufferServerAddress == null) {
       throw new AssertionError("No buffer server address assigned");
     }
 
-    Map<OperatorDeployInfo, PTOperator> nodes = new LinkedHashMap<OperatorDeployInfo, PTOperator>();
-    HashSet<PTOperator.PTOutput> publishers = new HashSet<PTOperator.PTOutput>();
+    Map<OperatorDeployInfo, PTOperator> nodes = new LinkedHashMap<>();
+    HashSet<PTOperator.PTOutput> publishers = new HashSet<>();
 
     PhysicalPlan physicalPlan = dnmgr.getPhysicalPlan();
 
@@ -156,8 +171,8 @@ public class StreamingContainerAgent {
       OperatorDeployInfo ndi = createOperatorDeployInfo(oper);
 
       nodes.put(ndi, oper);
-      ndi.inputs = new ArrayList<InputDeployInfo>(oper.getInputs().size());
-      ndi.outputs = new ArrayList<OutputDeployInfo>(oper.getOutputs().size());
+      ndi.inputs = new ArrayList<>(oper.getInputs().size());
+      ndi.outputs = new ArrayList<>(oper.getOutputs().size());
 
       for (PTOperator.PTOutput out : oper.getOutputs()) {
         final StreamMeta streamMeta = out.logicalStream;
@@ -168,8 +183,7 @@ public class StreamingContainerAgent {
 
         try {
           portInfo.contextAttributes = streamMeta.getSource().getAttributes().clone();
-        }
-        catch (CloneNotSupportedException ex) {
+        } catch (CloneNotSupportedException ex) {
           throw new RuntimeException("Cannot clone attributes", ex);
         }
 
@@ -279,7 +293,7 @@ public class StreamingContainerAgent {
       }
     }
 
-    return new ArrayList<OperatorDeployInfo>(nodes.keySet());
+    return new ArrayList<>(nodes.keySet());
   }
 
   public static InputPortMeta getInputPortMeta(LogicalPlan.OperatorMeta operatorMeta, StreamMeta streamMeta)
@@ -349,7 +363,6 @@ public class StreamingContainerAgent {
    * <p>
    *
    * @return {@link com.datatorrent.stram.api.OperatorDeployInfo}
-   *
    */
   private OperatorDeployInfo createOperatorDeployInfo(PTOperator oper)
   {
@@ -359,13 +372,11 @@ public class StreamingContainerAgent {
       UnifierDeployInfo udi = new UnifierDeployInfo(); /* the constructor auto sets the type */
       try {
         udi.operatorAttributes = oper.getUnifiedOperatorMeta().getAttributes().clone();
-      }
-      catch (CloneNotSupportedException ex) {
+      } catch (CloneNotSupportedException ex) {
         throw new RuntimeException("Cannot clone unifier attributes", ex);
       }
       ndi = udi;
-    }
-    else {
+    } else {
       ndi = new OperatorDeployInfo();
       Operator operator = oper.getOperatorMeta().getOperator();
       if (operator instanceof InputOperator) {
@@ -374,15 +385,14 @@ public class StreamingContainerAgent {
         if (!oper.getInputs().isEmpty()) {
           //If there are no input ports then it has to be an input operator. But if there are input ports then
           //we check if any input port is connected which would make it a Generic operator.
-          for (PTOperator.PTInput ptInput: oper.getInputs()) {
+          for (PTOperator.PTInput ptInput : oper.getInputs()) {
             if (ptInput.logicalStream != null && ptInput.logicalStream.getSource() != null) {
               ndi.type = OperatorType.GENERIC;
               break;
             }
           }
         }
-      }
-      else {
+      } else {
         ndi.type = OperatorType.GENERIC;
       }
     }
@@ -410,8 +420,7 @@ public class StreamingContainerAgent {
         if (checkpoint == null || checkpoint.windowId != checkpointId) {
           checkpoint = new Checkpoint(checkpointId, 0, 0);
         }
-      }
-      catch (Exception e) {
+      } catch (Exception e) {
         throw new RuntimeException("Failed to determine checkpoint window id " + oper, e);
       }
     }
@@ -423,8 +432,7 @@ public class StreamingContainerAgent {
     try {
       // clone map before modifying it
       ndi.contextAttributes = oper.getOperatorMeta().getAttributes().clone();
-    }
-    catch (CloneNotSupportedException ex) {
+    } catch (CloneNotSupportedException ex) {
       throw new RuntimeException("Cannot clone operator attributes", ex);
     }
     if (oper.isOperatorStateLess()) {
@@ -433,7 +441,8 @@ public class StreamingContainerAgent {
     return ndi;
   }
 
-  public ContainerInfo getContainerInfo() {
+  public ContainerInfo getContainerInfo()
+  {
     ContainerInfo ci = new ContainerInfo();
     ci.id = container.getExternalId();
     ci.host = container.host;
@@ -449,8 +458,12 @@ public class StreamingContainerAgent {
     ci.finishedTime = container.getFinishedTime();
     if (this.container.nodeHttpAddress != null) {
       YarnConfiguration conf = new YarnConfiguration();
-      ci.containerLogsUrl = ConfigUtils.getSchemePrefix(conf) + this.container.nodeHttpAddress + "/node/containerlogs/" + ci.id + "/" + System.getenv(ApplicationConstants.Environment.USER.toString());
-      ci.rawContainerLogsUrl = ConfigUtils.getRawContainerLogsUrl(conf, container.nodeHttpAddress, container.getPlan().getLogicalPlan().getAttributes().get(LogicalPlan.APPLICATION_ID), ci.id);
+      ci.containerLogsUrl = ConfigUtils
+          .getSchemePrefix(conf) + this.container.nodeHttpAddress + "/node/containerlogs/" + ci.id + "/" + System
+          .getenv(ApplicationConstants.Environment.USER.toString());
+      ci.rawContainerLogsUrl = ConfigUtils
+          .getRawContainerLogsUrl(conf, container.nodeHttpAddress, container.getPlan().getLogicalPlan().getAttributes()
+              .get(LogicalPlan.APPLICATION_ID), ci.id);
     }
     return ci;
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index eb3fb96..9e184e7 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -18,34 +18,47 @@
  */
 package com.datatorrent.stram;
 
-import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.net.URI;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nullable;
 
-import com.datatorrent.netlet.util.DTThrowable;
-import com.esotericsoftware.kryo.KryoException;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.google.common.base.Predicate;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import net.engio.mbassy.bus.MBassador;
-import net.engio.mbassy.bus.config.BusConfiguration;
-
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
@@ -72,13 +85,33 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
 
-import com.datatorrent.api.*;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.InputPort;
 import com.datatorrent.api.Operator.OutputPort;
 import com.datatorrent.api.Stats.OperatorStats;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.StorageAgent;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.StringCodec;
 import com.datatorrent.api.annotation.Stateless;
-
 import com.datatorrent.bufferserver.auth.AuthManager;
 import com.datatorrent.bufferserver.util.Codec;
 import com.datatorrent.common.experimental.AppData;
@@ -88,8 +121,21 @@ import com.datatorrent.common.util.NumberAggregate;
 import com.datatorrent.common.util.Pair;
 import com.datatorrent.stram.Journal.Recoverable;
 import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
-import com.datatorrent.stram.api.*;
-import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.*;
+import com.datatorrent.stram.api.AppDataSource;
+import com.datatorrent.stram.api.Checkpoint;
+import com.datatorrent.stram.api.ContainerContext;
+import com.datatorrent.stram.api.OperatorDeployInfo;
+import com.datatorrent.stram.api.StramEvent;
+import com.datatorrent.stram.api.StramToNodeChangeLoggersRequest;
+import com.datatorrent.stram.api.StramToNodeGetPropertyRequest;
+import com.datatorrent.stram.api.StramToNodeSetPropertyRequest;
+import com.datatorrent.stram.api.StramToNodeStartRecordingRequest;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeat;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StramToNodeRequest;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StreamingContainerContext;
 import com.datatorrent.stram.engine.OperatorResponse;
 import com.datatorrent.stram.engine.StreamingContainer;
 import com.datatorrent.stram.engine.WindowGenerator;
@@ -103,18 +149,30 @@ import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
 import com.datatorrent.stram.plan.logical.Operators;
 import com.datatorrent.stram.plan.logical.Operators.PortContextPair;
 import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
-import com.datatorrent.stram.plan.physical.*;
+import com.datatorrent.stram.plan.physical.OperatorStatus;
 import com.datatorrent.stram.plan.physical.OperatorStatus.PortStatus;
+import com.datatorrent.stram.plan.physical.PTContainer;
+import com.datatorrent.stram.plan.physical.PTOperator;
 import com.datatorrent.stram.plan.physical.PTOperator.PTInput;
 import com.datatorrent.stram.plan.physical.PTOperator.PTOutput;
 import com.datatorrent.stram.plan.physical.PTOperator.State;
+import com.datatorrent.stram.plan.physical.PhysicalPlan;
 import com.datatorrent.stram.plan.physical.PhysicalPlan.PlanContext;
+import com.datatorrent.stram.plan.physical.PlanModifier;
 import com.datatorrent.stram.util.ConfigUtils;
 import com.datatorrent.stram.util.FSJsonLineFile;
 import com.datatorrent.stram.util.MovingAverage.MovingAverageLong;
 import com.datatorrent.stram.util.SharedPubSubWebSocketClient;
 import com.datatorrent.stram.util.WebServicesClient;
-import com.datatorrent.stram.webapp.*;
+import com.datatorrent.stram.webapp.ContainerInfo;
+import com.datatorrent.stram.webapp.LogicalOperatorInfo;
+import com.datatorrent.stram.webapp.OperatorAggregationInfo;
+import com.datatorrent.stram.webapp.OperatorInfo;
+import com.datatorrent.stram.webapp.PortInfo;
+import com.datatorrent.stram.webapp.StreamInfo;
+
+import net.engio.mbassy.bus.MBassador;
+import net.engio.mbassy.bus.config.BusConfiguration;
 
 /**
  * Tracks topology provisioning/allocation to containers<p>
@@ -129,20 +187,19 @@ import com.datatorrent.stram.webapp.*;
  */
 public class StreamingContainerManager implements PlanContext
 {
-  private final static Logger LOG = LoggerFactory.getLogger(StreamingContainerManager.class);
-  public final static String GATEWAY_LOGIN_URL_PATH = "/ws/v2/login";
-  public final static String BUILTIN_APPDATA_URL = "builtin";
-  public final static String CONTAINERS_INFO_FILENAME_FORMAT = "containers_%d.json";
-  public final static String OPERATORS_INFO_FILENAME_FORMAT = "operators_%d.json";
-  public final static String APP_META_FILENAME = "meta.json";
-  public final static String APP_META_KEY_ATTRIBUTES = "attributes";
-  public final static String APP_META_KEY_METRICS = "metrics";
+  private static final Logger LOG = LoggerFactory.getLogger(StreamingContainerManager.class);
+  public static final String GATEWAY_LOGIN_URL_PATH = "/ws/v2/login";
+  public static final String BUILTIN_APPDATA_URL = "builtin";
+  public static final String CONTAINERS_INFO_FILENAME_FORMAT = "containers_%d.json";
+  public static final String OPERATORS_INFO_FILENAME_FORMAT = "operators_%d.json";
+  public static final String APP_META_FILENAME = "meta.json";
+  public static final String APP_META_KEY_ATTRIBUTES = "attributes";
+  public static final String APP_META_KEY_METRICS = "metrics";
   public static final String EMBEDDABLE_QUERY_NAME_SUFFIX = ".query";
 
-  public final static long LATENCY_WARNING_THRESHOLD_MILLIS = 10 * 60 * 1000; // 10 minutes
-  public final static Recoverable SET_OPERATOR_PROPERTY = new SetOperatorProperty();
-  public final static Recoverable SET_PHYSICAL_OPERATOR_PROPERTY = new SetPhysicalOperatorProperty();
-  public final static int METRIC_QUEUE_SIZE = 1000;
+  public static final Recoverable SET_OPERATOR_PROPERTY = new SetOperatorProperty();
+  public static final Recoverable SET_PHYSICAL_OPERATOR_PROPERTY = new SetPhysicalOperatorProperty();
+  public static final int METRIC_QUEUE_SIZE = 1000;
 
   private final FinalVars vars;
   private final PhysicalPlan plan;
@@ -150,27 +207,27 @@ public class StreamingContainerManager implements PlanContext
   private SharedPubSubWebSocketClient wsClient;
   private FSStatsRecorder statsRecorder;
   private FSEventRecorder eventRecorder;
-  protected final Map<String, String> containerStopRequests = new ConcurrentHashMap<String, String>();
-  protected final ConcurrentLinkedQueue<ContainerStartRequest> containerStartRequests = new ConcurrentLinkedQueue<ContainerStartRequest>();
+  protected final Map<String, String> containerStopRequests = new ConcurrentHashMap<>();
+  protected final ConcurrentLinkedQueue<ContainerStartRequest> containerStartRequests = new ConcurrentLinkedQueue<>();
   protected boolean forcedShutdown = false;
-  private final ConcurrentLinkedQueue<Runnable> eventQueue = new ConcurrentLinkedQueue<Runnable>();
+  private final ConcurrentLinkedQueue<Runnable> eventQueue = new ConcurrentLinkedQueue<>();
   private final AtomicBoolean eventQueueProcessing = new AtomicBoolean();
   private final HashSet<PTContainer> pendingAllocation = Sets.newLinkedHashSet();
   protected String shutdownDiagnosticsMessage = "";
   private long lastResourceRequest = 0;
-  private final Map<String, StreamingContainerAgent> containers = new ConcurrentHashMap<String, StreamingContainerAgent>();
-  private final List<Pair<PTOperator, Long>> purgeCheckpoints = new ArrayList<Pair<PTOperator, Long>>();
+  private final Map<String, StreamingContainerAgent> containers = new ConcurrentHashMap<>();
+  private final List<Pair<PTOperator, Long>> purgeCheckpoints = new ArrayList<>();
   private Map<OperatorMeta, Set<OperatorMeta>> checkpointGroups;
   private final Map<Long, Set<PTOperator>> shutdownOperators = new HashMap<>();
   private CriticalPathInfo criticalPathInfo;
-  private final ConcurrentMap<PTOperator, PTOperator> reportStats = Maps.newConcurrentMap();
+  private final ConcurrentMap<PTOperator, PTOperator> reportStats = new ConcurrentHashMap<>();
   private final AtomicBoolean deployChangeInProgress = new AtomicBoolean();
   private int deployChangeCnt;
   private MBassador<StramEvent> eventBus; // event bus for publishing stram events
-  final private Journal journal;
+  private final Journal journal;
   private RecoveryHandler recoveryHandler;
   // window id to node id to end window stats
-  private final ConcurrentSkipListMap<Long, Map<Integer, EndWindowStats>> endWindowStatsOperatorMap = new ConcurrentSkipListMap<Long, Map<Integer, EndWindowStats>>();
+  private final ConcurrentSkipListMap<Long, Map<Integer, EndWindowStats>> endWindowStatsOperatorMap = new ConcurrentSkipListMap<>();
   private final ConcurrentMap<PTOperator, PTOperator> slowestUpstreamOp = new ConcurrentHashMap<>();
   private long committedWindowId;
   // (operator id, port name) to timestamp
@@ -179,12 +236,11 @@ public class StreamingContainerManager implements PlanContext
   private long lastStatsTimestamp = System.currentTimeMillis();
   private long currentEndWindowStatsWindowId;
   private long completeEndWindowStatsWindowId;
-  private final ConcurrentHashMap<String, MovingAverageLong> rpcLatencies = new ConcurrentHashMap<String, MovingAverageLong>();
+  private final ConcurrentHashMap<String, MovingAverageLong> rpcLatencies = new ConcurrentHashMap<>();
   private final AtomicLong nodeToStramRequestIds = new AtomicLong(1);
   private int allocatedMemoryMB = 0;
   private List<AppDataSource> appDataSources = null;
   private final Cache<Long, Object> commandResponse = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build();
-  private long lastLatencyWarningTime;
   private transient ExecutorService poolExecutor;
   private FileContext fileContext;
 
@@ -223,7 +279,7 @@ public class StreamingContainerManager implements PlanContext
   private static class EndWindowStats
   {
     long emitTimestamp = -1;
-    HashMap<String, Long> dequeueTimestamps = new HashMap<String, Long>(); // input port name to end window dequeue time
+    HashMap<String, Long> dequeueTimestamps = new HashMap<>(); // input port name to end window dequeue time
     Object counters;
     Map<String, Object> metrics;
   }
@@ -253,9 +309,9 @@ public class StreamingContainerManager implements PlanContext
 
   private static class SetOperatorProperty implements Recoverable
   {
-    final private String operatorName;
-    final private String propertyName;
-    final private String propertyValue;
+    private final String operatorName;
+    private final String propertyName;
+    private final String propertyValue;
 
     private SetOperatorProperty()
     {
@@ -298,9 +354,9 @@ public class StreamingContainerManager implements PlanContext
 
   private static class SetPhysicalOperatorProperty implements Recoverable
   {
-    final private int operatorId;
-    final private String propertyName;
-    final private String propertyValue;
+    private final int operatorId;
+    private final String propertyName;
+    private final String propertyValue;
 
     private SetPhysicalOperatorProperty()
     {
@@ -357,7 +413,7 @@ public class StreamingContainerManager implements PlanContext
     poolExecutor = Executors.newFixedThreadPool(4);
     // setup prior to plan creation for event recording
     if (enableEventRecording) {
-      this.eventBus = new MBassador<StramEvent>(BusConfiguration.Default(1, 1, 1));
+      this.eventBus = new MBassador<>(BusConfiguration.Default(1, 1, 1));
     }
     this.plan = new PhysicalPlan(dag, this);
     this.journal = new Journal(this);
@@ -370,7 +426,7 @@ public class StreamingContainerManager implements PlanContext
     this.clock = new SystemClock();
     poolExecutor = Executors.newFixedThreadPool(4);
     this.plan = checkpointedState.physicalPlan;
-    this.eventBus = new MBassador<StramEvent>(BusConfiguration.Default(1, 1, 1));
+    this.eventBus = new MBassador<>(BusConfiguration.Default(1, 1, 1));
     this.journal = new Journal(this);
     init(enableEventRecording);
   }
@@ -393,11 +449,12 @@ public class StreamingContainerManager implements PlanContext
       fileName = String.format(OPERATORS_INFO_FILENAME_FORMAT, plan.getLogicalPlan().getValue(LogicalPlan.APPLICATION_ATTEMPT_ID));
       this.operatorFile = new FSJsonLineFile(fileContext, new Path(this.vars.appPath, fileName), FsPermission.getDefault());
     } catch (IOException ex) {
-      throw DTThrowable.wrapIfChecked(ex);
+      throw Throwables.propagate(ex);
     }
   }
 
-  public Journal getJournal() {
+  public Journal getJournal()
+  {
     return journal;
   }
 
@@ -431,8 +488,7 @@ public class StreamingContainerManager implements PlanContext
             } else {
               LOG.warn("Could not determine the memory allocated for the streaming application master.  Node manager is reporting {} MB from {}", totalMemoryNeededMB, url);
             }
-          }
-          catch (Exception ex) {
+          } catch (Exception ex) {
             LOG.warn("Could not determine the memory allocated for the streaming application master", ex);
           }
         }
@@ -501,8 +557,7 @@ public class StreamingContainerManager implements PlanContext
           wsClient.setPassword(gatewayPassword);
         }
         wsClient.setup();
-      }
-      catch (Exception ex) {
+      } catch (Exception ex) {
         LOG.warn("Cannot establish websocket connection to {}", gatewayAddress, ex);
       }
     }
@@ -522,7 +577,7 @@ public class StreamingContainerManager implements PlanContext
 
     IOUtils.closeQuietly(containerFile);
     IOUtils.closeQuietly(operatorFile);
-    if(poolExecutor != null) {
+    if (poolExecutor != null) {
       poolExecutor.shutdown();
     }
   }
@@ -582,7 +637,7 @@ public class StreamingContainerManager implements PlanContext
   {
     synchronized (appDataSourcesLock) {
       if (appDataSources == null) {
-        appDataSources = new ArrayList<AppDataSource>();
+        appDataSources = new ArrayList<>();
         operators:
         for (LogicalPlan.OperatorMeta operatorMeta : plan.getLogicalPlan().getAllOperators()) {
           Map<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta> inputStreams = operatorMeta.getInputStreams();
@@ -608,7 +663,7 @@ public class StreamingContainerManager implements PlanContext
           }
 
           //Discover separate query operators
-          LOG.warn("DEBUG: looking at operator {} {}", operatorMeta.getName(), Thread.currentThread().getId());
+          LOG.debug("Looking at operator {} {}", operatorMeta.getName(), Thread.currentThread().getId());
           for (Map.Entry<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta> entry : inputStreams.entrySet()) {
             LogicalPlan.InputPortMeta portMeta = entry.getKey();
             if (portMeta.isAppDataQueryPort()) {
@@ -621,9 +676,8 @@ public class StreamingContainerManager implements PlanContext
                     queryUrl = queryOperator.getAppDataURL();
                     queryTopic = queryOperator.getTopic();
                   } else {
-                    LOG.warn("An embeddable query connector and the {} query operator were discovered. " +
-                             "The query operator will be ignored and the embeddable query connector will be used instead.",
-                             operatorMeta.getName());
+                    LOG.warn("An embeddable query connector and the {} query operator were discovered. The query operator will be ignored and the embeddable query connector will be used instead.",
+                        operatorMeta.getName());
                   }
                 }
               } else {
@@ -635,7 +689,7 @@ public class StreamingContainerManager implements PlanContext
 
           for (Map.Entry<LogicalPlan.OutputPortMeta, LogicalPlan.StreamMeta> entry : outputStreams.entrySet()) {
             LogicalPlan.OutputPortMeta portMeta = entry.getKey();
-            LOG.warn("DEBUG: looking at port {} {}", portMeta.getPortName(), Thread.currentThread().getId());
+            LOG.debug("Looking at port {} {}", portMeta.getPortName(), Thread.currentThread().getId());
 
             if (portMeta.isAppDataResultPort()) {
               AppDataSource appDataSource = new AppDataSource();
@@ -661,7 +715,7 @@ public class StreamingContainerManager implements PlanContext
               }
               OperatorMeta resultOperatorMeta = sinks.get(0).getOperatorWrapper();
               if (resultOperatorMeta.getOperator() instanceof AppData.ConnectionInfoProvider) {
-                AppData.ConnectionInfoProvider resultOperator = (AppData.ConnectionInfoProvider) resultOperatorMeta.getOperator();
+                AppData.ConnectionInfoProvider resultOperator = (AppData.ConnectionInfoProvider)resultOperatorMeta.getOperator();
                 appDataSource.setResultOperatorName(resultOperatorMeta.getName());
                 appDataSource.setResultTopic(resultOperator.getTopic());
                 appDataSource.setResultUrl(convertAppDataUrl(resultOperator.getAppDataURL()));
@@ -673,7 +727,7 @@ public class StreamingContainerManager implements PlanContext
                 LOG.warn("Result operator for the App Data Source {}.{} does not implement the right interface. Ignoring the App Data Source.", operatorMeta.getName(), portMeta.getPortName());
                 continue;
               }
-              LOG.warn("DEBUG: Adding appDataSource {} {}", appDataSource.getName(), Thread.currentThread().getId());
+              LOG.debug("Adding appDataSource {} {}", appDataSource.getName(), Thread.currentThread().getId());
               appDataSources.add(appDataSource);
             }
           }
@@ -708,8 +762,7 @@ public class StreamingContainerManager implements PlanContext
         }
         shutdownAllContainers(msg);
         this.forcedShutdown = true;
-      }
-      else {
+      } else {
         for (PTContainer c : pendingAllocation) {
           LOG.debug("Waiting for resource: {}m {}", c.getRequiredMemoryMB(), c);
         }
@@ -728,8 +781,7 @@ public class StreamingContainerManager implements PlanContext
             LOG.info("Container {}@{} startup timeout ({} ms).", c.getExternalId(), c.host, currentTms - sca.createdMillis);
             containerStopRequests.put(c.getExternalId(), c.getExternalId());
           }
-        }
-        else {
+        } else {
           if (currentTms - sca.lastHeartbeatMillis > this.vars.heartbeatTimeoutMillis) {
             if (!isApplicationIdle()) {
               // request stop (kill) as process may still be hanging around (would have been detected by Yarn otherwise)
@@ -756,8 +808,7 @@ public class StreamingContainerManager implements PlanContext
     try {
       statsRecorder.recordContainers(containers, currentTms);
       statsRecorder.recordOperators(getOperatorInfoList(), currentTms);
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       LOG.warn("Exception caught when recording stats", ex);
     }
   }
@@ -771,8 +822,8 @@ public class StreamingContainerManager implements PlanContext
         LOG.warn("Some operators are behind for more than {} windows! Trimming the end window stats map", this.vars.maxWindowsBehindForStats);
         while (endWindowStatsOperatorMap.size() > this.vars.maxWindowsBehindForStats) {
           LOG.debug("Removing incomplete end window stats for window id {}. Collected operator set: {}. Complete set: {}",
-            endWindowStatsOperatorMap.firstKey(),
-            endWindowStatsOperatorMap.get(endWindowStatsOperatorMap.firstKey()).keySet(), allCurrentOperators);
+              endWindowStatsOperatorMap.firstKey(),
+              endWindowStatsOperatorMap.get(endWindowStatsOperatorMap.firstKey()).keySet(), allCurrentOperators);
           endWindowStatsOperatorMap.remove(endWindowStatsOperatorMap.firstKey());
         }
       }
@@ -791,17 +842,14 @@ public class StreamingContainerManager implements PlanContext
             if (windowId < completeEndWindowStatsWindowId) {
               LOG.debug("Disregarding stale end window stats for window {}", windowId);
               endWindowStatsOperatorMap.remove(windowId);
-            }
-            else {
+            } else {
               break;
             }
-          }
-          else {
+          } else {
             endWindowStatsOperatorMap.remove(windowId);
             currentEndWindowStatsWindowId = windowId;
           }
-        }
-        else {
+        } else {
           // the old stats contains operators that do not exist any more
           // this is probably right after a partition happens.
           LOG.debug("Stats for non-existent operators detected. Disregarding end window stats for window {}", windowId);
@@ -839,7 +887,7 @@ public class StreamingContainerManager implements PlanContext
 
     for (OperatorMeta operatorMeta : logicalOperators) {
       AutoMetric.Aggregator aggregator = operatorMeta.getMetricAggregatorMeta() != null ?
-        operatorMeta.getMetricAggregatorMeta().getAggregator() : null;
+          operatorMeta.getMetricAggregatorMeta().getAggregator() : null;
       if (aggregator == null) {
         continue;
       }
@@ -878,7 +926,7 @@ public class StreamingContainerManager implements PlanContext
           logicalMetrics.put(operatorMeta.getName(), windowMetrics);
         }
         LOG.debug("Adding to logical metrics for {}", operatorMeta.getName());
-        windowMetrics.add(new Pair<Long, Map<String, Object>>(windowId, lm));
+        windowMetrics.add(new Pair<>(windowId, lm));
         Map<String, Object> oldValue = latestLogicalMetrics.put(operatorMeta.getName(), lm);
         if (oldValue == null) {
           try {
@@ -898,7 +946,8 @@ public class StreamingContainerManager implements PlanContext
   private void saveMetaInfo() throws IOException
   {
     Path file = new Path(this.vars.appPath, APP_META_FILENAME + "." + System.nanoTime());
-    try (FSDataOutputStream os = fileContext.create(file, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent())) {      JSONObject top = new JSONObject();
+    try (FSDataOutputStream os = fileContext.create(file, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent())) {
+      JSONObject top = new JSONObject();
       JSONObject attributes = new JSONObject();
       for (Map.Entry<Attribute<?>, Object> entry : this.plan.getLogicalPlan().getAttributes().entrySet()) {
         attributes.put(entry.getKey().getSimpleName(), entry.getValue());
@@ -1244,8 +1293,7 @@ public class StreamingContainerManager implements PlanContext
       scc.attributes.put(ContainerContext.BUFFER_SERVER_TOKEN, container.getBufferServerToken());
       scc.startWindowMillis = this.vars.windowStartMillis;
       return scc;
-    }
-    catch (CloneNotSupportedException ex) {
+    } catch (CloneNotSupportedException ex) {
       throw new RuntimeException("Cannot clone DAG attributes", ex);
     }
   }
@@ -1282,15 +1330,14 @@ public class StreamingContainerManager implements PlanContext
         // operator expected active, check remote status
         if (ds == null) {
           sca.deployOpers.add(oper);
-        }
-        else {
+        } else {
           switch (ds) {
             case SHUTDOWN:
               // schedule operator deactivation against the windowId
               // will be processed once window is committed and all dependent operators completed processing
               long windowId = oper.stats.currentWindowId.get();
               if (ohb.windowStats != null && !ohb.windowStats.isEmpty()) {
-                windowId = ohb.windowStats.get(ohb.windowStats.size()-1).windowId;
+                windowId = ohb.windowStats.get(ohb.windowStats.size() - 1).windowId;
               }
               LOG.debug("Operator {} deactivated at window {}", oper, windowId);
               synchronized (this.shutdownOperators) {
@@ -1312,6 +1359,7 @@ public class StreamingContainerManager implements PlanContext
               recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId()));
               break;
             case ACTIVE:
+            default:
               break;
           }
         }
@@ -1322,8 +1370,7 @@ public class StreamingContainerManager implements PlanContext
           recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId()));
           oper.setState(State.PENDING_DEPLOY);
           sca.deployOpers.add(oper);
-        }
-        else {
+        } else {
           // operator is currently deployed, request undeploy
           sca.undeployOpers.add(oper.getId());
           slowestUpstreamOp.remove(oper);
@@ -1333,8 +1380,7 @@ public class StreamingContainerManager implements PlanContext
         if (ds == null) {
           // operator to be deployed
           sca.deployOpers.add(oper);
-        }
-        else {
+        } else {
           // operator was deployed in container
           PTContainer container = oper.getContainer();
           LOG.debug("{} marking deployed: {} remote status {}", container.getExternalId(), oper, ds);
@@ -1371,15 +1417,13 @@ public class StreamingContainerManager implements PlanContext
         // existing container or sandbox container for just the operator
         LOG.error("Initiating container restart after operator failure {}", oper);
         containerStopRequests.put(oper.getContainer().getExternalId(), oper.getContainer().getExternalId());
-      }
-      else {
+      } else {
         String msg = String.format("Shutdown after reaching failure threshold for %s", oper);
         LOG.warn(msg);
         shutdownAllContainers(msg);
         forcedShutdown = true;
       }
-    }
-    else {
+    } else {
       // should not get here
       LOG.warn("Failed operator {} {} {} to be undeployed by container", oper, oper.getState());
     }
@@ -1476,10 +1520,10 @@ public class StreamingContainerManager implements PlanContext
       if (shb.requestResponse != null) {
         for (StatsListener.OperatorResponse obj : shb.requestResponse) {
           if (obj instanceof OperatorResponse) {      // This is to identify platform requests
-            commandResponse.put((Long) obj.getResponseId(), obj.getResponse());
+            commandResponse.put((Long)obj.getResponseId(), obj.getResponse());
             LOG.debug(" Got back the response {} for the request {}", obj, obj.getResponseId());
-          }
-          else {       // This is to identify user requests
+          } else {
+            // This is to identify user requests
             oper.stats.responses.add(obj);
           }
         }
@@ -1520,7 +1564,7 @@ public class StreamingContainerManager implements PlanContext
           /* report checkpoint-ed WindowId status of the operator */
           if (stats.checkpoint instanceof Checkpoint) {
             if (oper.getRecentCheckpoint() == null || oper.getRecentCheckpoint().windowId < stats.checkpoint.getWindowId()) {
-              addCheckpoint(oper, (Checkpoint) stats.checkpoint);
+              addCheckpoint(oper, (Checkpoint)stats.checkpoint);
               if (stats.checkpointStats != null) {
                 status.checkpointStats = stats.checkpointStats;
                 status.checkpointTimeMA.add(stats.checkpointStats.checkpointTime);
@@ -1641,14 +1685,14 @@ public class StreamingContainerManager implements PlanContext
             endWindowStats.counters = stats.counters;
           }
           if (oper.getOperatorMeta().getMetricAggregatorMeta() != null &&
-            oper.getOperatorMeta().getMetricAggregatorMeta().getAggregator() != null) {
+              oper.getOperatorMeta().getMetricAggregatorMeta().getAggregator() != null) {
             endWindowStats.metrics = stats.metrics;
           }
 
           if (stats.windowId > currentEndWindowStatsWindowId) {
             Map<Integer, EndWindowStats> endWindowStatsMap = endWindowStatsOperatorMap.get(stats.windowId);
             if (endWindowStatsMap == null) {
-              endWindowStatsMap = new ConcurrentSkipListMap<Integer, EndWindowStats>();
+              endWindowStatsMap = new ConcurrentSkipListMap<>();
               Map<Integer, EndWindowStats> endWindowStatsMapPrevious =
                   endWindowStatsOperatorMap.putIfAbsent(stats.windowId, endWindowStatsMap);
               if (endWindowStatsMapPrevious != null) {
@@ -1729,9 +1773,9 @@ public class StreamingContainerManager implements PlanContext
           }
           status.tuplesProcessedPSMA.set(Math.round(tuplesProcessedPMSMA * 1000));
           status.tuplesEmittedPSMA.set(Math.round(tuplesEmittedPMSMA * 1000));
-        }
-        else {
-          //LOG.warn("This timestamp for {} is lower than the previous!! {} < {}", oper.getId(), maxEndWindowTimestamp, lastMaxEndWindowTimestamp);
+        } else {
+          //LOG.warn("This timestamp for {} is lower than the previous!! {} < {}", oper.getId(),
+          // maxEndWindowTimestamp, lastMaxEndWindowTimestamp);
         }
         operatorLastEndWindowTimestamps.put(oper.getId(), maxEndWindowTimestamp);
         status.listenerStats.add(statsList);
@@ -1863,13 +1907,11 @@ public class StreamingContainerManager implements PlanContext
             if (li.previous().windowId != checkpoint.windowId) {
               li.add(checkpoint);
             }
-          }
-          else {
+          } else {
             node.checkpoints.add(checkpoint);
           }
         }
-      }
-      else {
+      } else {
         node.checkpoints.add(checkpoint);
       }
     }
@@ -1878,8 +1920,8 @@ public class StreamingContainerManager implements PlanContext
   public static class UpdateCheckpointsContext
   {
     public final MutableLong committedWindowId = new MutableLong(Long.MAX_VALUE);
-    public final Set<PTOperator> visited = new LinkedHashSet<PTOperator>();
-    public final Set<PTOperator> blocked = new LinkedHashSet<PTOperator>();
+    public final Set<PTOperator> visited = new LinkedHashSet<>();
+    public final Set<PTOperator> blocked = new LinkedHashSet<>();
     public final long currentTms;
     public final boolean recovery;
     public final Map<OperatorMeta, Set<OperatorMeta>> checkpointGroups;
@@ -2017,11 +2059,10 @@ public class StreamingContainerManager implements PlanContext
             while (checkpoints.size() > 1 && ((c2 = checkpoints.get(1)).windowId) <= maxCheckpoint.windowId) {
               checkpoints.removeFirst();
               //LOG.debug("Checkpoint to delete: operator={} windowId={}", operator.getName(), c1);
-              this.purgeCheckpoints.add(new Pair<PTOperator, Long>(groupOper, c1.windowId));
+              this.purgeCheckpoints.add(new Pair<>(groupOper, c1.windowId));
               c1 = c2;
             }
-          }
-          else {
+          } else {
             if (ctx.recovery && checkpoints.isEmpty() && groupOper.isOperatorStateLess()) {
               LOG.debug("Adding checkpoint for stateless operator {} {}", groupOper, Codec.getStringWindowId(maxCheckpoint.windowId));
               c1 = groupOper.addCheckpoint(maxCheckpoint.windowId, this.vars.windowStartMillis);
@@ -2030,8 +2071,7 @@ public class StreamingContainerManager implements PlanContext
         }
         //LOG.debug("Operator {} checkpoints: commit {} recent {}", new Object[] {operator.getName(), c1, operator.checkpoints});
         groupOper.setRecoveryCheckpoint(c1);
-      }
-      else {
+      } else {
         LOG.debug("Skipping checkpoint update {} during {}", groupOper, groupOper.getState());
       }
     }
@@ -2118,8 +2158,7 @@ public class StreamingContainerManager implements PlanContext
           {
             try {
               operator.getOperatorMeta().getValue(OperatorContext.STORAGE_AGENT).delete(operator.getId(), windowId);
-            }
-            catch (IOException ex) {
+            } catch (IOException ex) {
               LOG.error("Failed to purge checkpoint for operator {} for windowId {}", operator, windowId, ex);
             }
           }
@@ -2133,7 +2172,7 @@ public class StreamingContainerManager implements PlanContext
             // address should be null only for a new container, in which case there should not be a purge request
             // TODO: logging added to find out how we got here
             LOG.warn("purge request w/o buffer server address source {} container {} checkpoints {}",
-              out, operator.getContainer(), operator.checkpoints);
+                out, operator.getContainer(), operator.checkpoints);
             continue;
           }
 
@@ -2146,8 +2185,7 @@ public class StreamingContainerManager implements PlanContext
             BufferServerController bsc = getBufferServerClient(operator);
             try {
               bsc.purge(null, sourceIdentifier, operator.checkpoints.getFirst().windowId - 1);
-            }
-            catch (RuntimeException re) {
+            } catch (RuntimeException re) {
               LOG.warn("Failed to purge {} {}", bsc.addr, sourceIdentifier, re);
             }
           }
@@ -2176,11 +2214,11 @@ public class StreamingContainerManager implements PlanContext
 
   private Map<PTContainer, List<PTOperator>> groupByContainer(Collection<PTOperator> operators)
   {
-    Map<PTContainer, List<PTOperator>> m = new HashMap<PTContainer, List<PTOperator>>();
+    Map<PTContainer, List<PTOperator>> m = new HashMap<>();
     for (PTOperator node : operators) {
       List<PTOperator> nodes = m.get(node.getContainer());
       if (nodes == null) {
-        nodes = new ArrayList<PTOperator>();
+        nodes = new ArrayList<>();
         m.put(node.getContainer(), nodes);
       }
       nodes.add(node);
@@ -2247,8 +2285,7 @@ public class StreamingContainerManager implements PlanContext
                       // ensures new subscriber starting to read from checkpoint will wait until publisher redeploy cycle is complete
                       try {
                         bsc.reset(null, sourceIdentifier, 0);
-                      }
-                      catch (Exception ex) {
+                      } catch (Exception ex) {
                         LOG.error("Failed to reset buffer server {} {}", sourceIdentifier, ex);
                       }
                     }
@@ -2283,8 +2320,7 @@ public class StreamingContainerManager implements PlanContext
         }
       }
 
-    }
-    finally {
+    } finally {
       this.deployChangeCnt++;
       this.deployChangeInProgress.set(false);
     }
@@ -2312,7 +2348,7 @@ public class StreamingContainerManager implements PlanContext
 
   public List<OperatorInfo> getOperatorInfoList()
   {
-    List<OperatorInfo> infoList = new ArrayList<OperatorInfo>();
+    List<OperatorInfo> infoList = new ArrayList<>();
 
     for (PTContainer container : this.plan.getContainers()) {
       for (PTOperator operator : container.getOperators()) {
@@ -2352,7 +2388,7 @@ public class StreamingContainerManager implements PlanContext
 
   public List<LogicalOperatorInfo> getLogicalOperatorInfoList()
   {
-    List<LogicalOperatorInfo> infoList = new ArrayList<LogicalOperatorInfo>();
+    List<LogicalOperatorInfo> infoList = new ArrayList<>();
     Collection<OperatorMeta> allOperators = getLogicalPlan().getAllOperators();
     for (OperatorMeta operatorMeta : allOperators) {
       infoList.add(fillLogicalOperatorInfo(operatorMeta));
@@ -2445,19 +2481,18 @@ public class StreamingContainerManager implements PlanContext
     loi.totalTuplesEmitted = operator.getStatus().totalTuplesEmitted;
     loi.totalTuplesProcessed = operator.getStatus().totalTuplesProcessed;
     loi.failureCount = operator.getStatus().failureCount;
-    loi.status = new HashMap<String, MutableInt>();
-    loi.partitions = new TreeSet<Integer>();
-    loi.unifiers = new TreeSet<Integer>();
-    loi.containerIds = new TreeSet<String>();
-    loi.hosts = new TreeSet<String>();
+    loi.status = new HashMap<>();
+    loi.partitions = new TreeSet<>();
+    loi.unifiers = new TreeSet<>();
+    loi.containerIds = new TreeSet<>();
+    loi.hosts = new TreeSet<>();
     Collection<PTOperator> physicalOperators = getPhysicalPlan().getAllOperators(operator);
     NumberAggregate.LongAggregate checkpointTimeAggregate = new NumberAggregate.LongAggregate();
     for (PTOperator physicalOperator : physicalOperators) {
       OperatorStatus os = physicalOperator.stats;
       if (physicalOperator.isUnifier()) {
         loi.unifiers.add(physicalOperator.getId());
-      }
-      else {
+      } else {
         loi.partitions.add(physicalOperator.getId());
 
         // exclude unifier, not sure if we should include it in the future
@@ -2556,7 +2591,7 @@ public class StreamingContainerManager implements PlanContext
 
   public List<StreamInfo> getStreamInfoList()
   {
-    List<StreamInfo> infoList = new ArrayList<StreamInfo>();
+    List<StreamInfo> infoList = new ArrayList<>();
 
     for (PTContainer container : this.plan.getContainers()) {
       for (PTOperator operator : container.getOperators()) {
@@ -2572,8 +2607,7 @@ public class StreamingContainerManager implements PlanContext
             p.operatorId = String.valueOf(input.target.getId());
             if (input.target.isUnifier()) {
               p.portName = StreamingContainer.getUnifierInputPortName(input.portName, operator.getId(), output.portName);
-            }
-            else {
+            } else {
               p.portName = input.portName;
             }
             si.sinks.add(p);
@@ -2587,7 +2621,7 @@ public class StreamingContainerManager implements PlanContext
 
   private static class RecordingRequestFilter implements Predicate<StramToNodeRequest>
   {
-    final static Set<StramToNodeRequest.RequestType> MATCH_TYPES = Sets.newHashSet(StramToNodeRequest.RequestType.START_RECORDING, StramToNodeRequest.RequestType.STOP_RECORDING, StramToNodeRequest.RequestType.SYNC_RECORDING);
+    static final Set<StramToNodeRequest.RequestType> MATCH_TYPES = Sets.newHashSet(StramToNodeRequest.RequestType.START_RECORDING, StramToNodeRequest.RequestType.STOP_RECORDING, StramToNodeRequest.RequestType.SYNC_RECORDING);
 
     @Override
     public boolean apply(@Nullable StramToNodeRequest input)
@@ -2623,7 +2657,7 @@ public class StreamingContainerManager implements PlanContext
   private void updateOnDeployRequests(PTOperator p, Predicate<StramToNodeRequest> superseded, StramToNodeRequest newRequest)
   {
     // filter existing requests
-    List<StramToNodeRequest> cloneRequests = new ArrayList<StramToNodeRequest>(p.deployRequests.size());
+    List<StramToNodeRequest> cloneRequests = new ArrayList<>(p.deployRequests.size());
     for (StramToNodeRequest existingRequest : p.deployRequests) {
       if (!superseded.apply(existingRequest)) {
         cloneRequests.add(existingRequest);
@@ -2699,11 +2733,13 @@ public class StreamingContainerManager implements PlanContext
     this.containerStopRequests.put(containerId, containerId);
   }
 
-  public Recoverable getSetOperatorProperty(String operatorName, String propertyName, String propertyValue) {
+  public Recoverable getSetOperatorProperty(String operatorName, String propertyName, String propertyValue)
+  {
     return new SetOperatorProperty(operatorName, propertyName, propertyValue);
   }
 
-  public Recoverable getSetPhysicalOperatorProperty(int operatorId, String propertyName, String propertyValue) {
+  public Recoverable getSetPhysicalOperatorProperty(int operatorId, String propertyName, String propertyValue)
+  {
     return new SetPhysicalOperatorProperty(operatorId, propertyName, propertyValue);
   }
 
@@ -2807,7 +2843,7 @@ public class StreamingContainerManager implements PlanContext
     task.requestId = nodeToStramRequestIds.incrementAndGet();
     task.waitTime = waitTime;
     request.requestId = task.requestId;
-    FutureTask<Object> future = new FutureTask<Object>(task);
+    FutureTask<Object> future = new FutureTask<>(task);
     dispatch(future);
     return future;
   }
@@ -2817,8 +2853,7 @@ public class StreamingContainerManager implements PlanContext
     LogicalPlan lp = getLogicalPlan();
     try {
       return lp.getAttributes().clone();
-    }
-    catch (CloneNotSupportedException ex) {
+    } catch (CloneNotSupportedException ex) {
       throw new RuntimeException("Cannot clone DAG attributes", ex);
     }
   }
@@ -2831,8 +2866,7 @@ public class StreamingContainerManager implements PlanContext
     }
     try {
       return logicalOperator.getAttributes().clone();
-    }
-    catch (CloneNotSupportedException ex) {
+    } catch (CloneNotSupportedException ex) {
       throw new RuntimeException("Cannot clone operator attributes", ex);
     }
   }
@@ -2884,7 +2918,7 @@ public class StreamingContainerManager implements PlanContext
   public FutureTask<Object> logicalPlanModification(List<LogicalPlanRequest> requests) throws Exception
   {
     // delegate processing to dispatch thread
-    FutureTask<Object> future = new FutureTask<Object>(new LogicalPlanChangeRunnable(requests));
+    FutureTask<Object> future = new FutureTask<>(new LogicalPlanChangeRunnable(requests));
     dispatch(future);
     //LOG.info("Scheduled plan changes: {}", requests);
     return future;
@@ -2960,8 +2994,7 @@ public class StreamingContainerManager implements PlanContext
       if (journal != null) {
         journal.write(operation);
       }
-    }
-    catch (Exception e) {
+    } catch (Exception e) {
       throw new IllegalStateException("Failed to write to journal " + operation, e);
     }
   }
@@ -2978,12 +3011,11 @@ public class StreamingContainerManager implements PlanContext
   public static StreamingContainerManager getInstance(RecoveryHandler rh, LogicalPlan dag, boolean enableEventRecording) throws IOException
   {
     try {
-      CheckpointState checkpointedState = (CheckpointState) rh.restore();
+      CheckpointState checkpointedState = (CheckpointState)rh.restore();
       StreamingContainerManager scm;
       if (checkpointedState == null) {
         scm = new StreamingContainerManager(dag, enableEventRecording, new SystemClock());
-      }
-      else {
+      } else {
         // find better way to support final transient members
         PhysicalPlan plan = checkpointedState.physicalPlan;
         plan.getLogicalPlan().setAttribute(LogicalPlan.APPLICATION_ATTEMPT_ID, dag.getAttributes().get(LogicalPlan.APPLICATION_ATTEMPT_ID));
@@ -2993,8 +3025,7 @@ public class StreamingContainerManager implements PlanContext
             f.setAccessible(true);
             try {
               f.set(plan, scm);
-            }
-            catch (Exception e) {
+            } catch (Exception e) {
               throw new RuntimeException("Failed to set " + f, e);
             }
             f.setAccessible(false);
@@ -3015,8 +3046,7 @@ public class StreamingContainerManager implements PlanContext
             LOG.debug("Restore container agent {} for {}", c.getExternalId(), c);
             StreamingContainerAgent sca = new StreamingContainerAgent(c, scm.newStreamingContainerContext(c), scm);
             scm.containers.put(c.getExternalId(), sca);
-          }
-          else {
+          } else {
             LOG.debug("Requesting new resource for {}", c.toIdStateString());
             scm.requestContainer(c);
           }
@@ -3025,8 +3055,7 @@ public class StreamingContainerManager implements PlanContext
       scm.recoveryHandler = rh;
       scm.checkpoint();
       return scm;
-    }
-    catch (IOException e) {
+    } catch (IOException e) {
       throw new IllegalStateException("Failed to read checkpointed state", e);
     }
   }
@@ -3106,16 +3135,16 @@ public class StreamingContainerManager implements PlanContext
 
       this.finals = new FinalVars(finals, lp);
       StorageAgent sa = lp.getValue(OperatorContext.STORAGE_AGENT);
-      if(sa instanceof AsyncFSStorageAgent){
+      if (sa instanceof AsyncFSStorageAgent) {
         // replace the default storage agent, if present
-        AsyncFSStorageAgent fssa = (AsyncFSStorageAgent) sa;
+        AsyncFSStorageAgent fssa = (AsyncFSStorageAgent)sa;
         if (fssa.path.contains(oldAppId)) {
           fssa = new AsyncFSStorageAgent(fssa.path.replace(oldAppId, appId), conf);
           lp.setAttribute(OperatorContext.STORAGE_AGENT, fssa);
         }
       } else if (sa instanceof FSStorageAgent) {
         // replace the default storage agent, if present
-        FSStorageAgent fssa = (FSStorageAgent) sa;
+        FSStorageAgent fssa = (FSStorageAgent)sa;
         if (fssa.path.contains(oldAppId)) {
           fssa = new FSStorageAgent(fssa.path.replace(oldAppId, appId), conf);
           lp.setAttribute(OperatorContext.STORAGE_AGENT, fssa);
@@ -3182,7 +3211,7 @@ public class StreamingContainerManager implements PlanContext
         Thread.sleep(100);
         LOG.debug("Polling for a response to request with Id {}", requestId);
       }
-      if(obj != null) {
+      if (obj != null) {
         commandResponse.invalidate(requestId);
         return obj;
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
index fe3f399..b36529a 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
@@ -18,24 +18,29 @@
  */
 package com.datatorrent.stram;
 
-import com.datatorrent.stram.api.StramEvent.ContainerErrorEvent;
-import com.datatorrent.stram.api.StramEvent.OperatorErrorEvent;
-import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
-import com.datatorrent.stram.util.SecureExecutor;
-import com.datatorrent.stram.webapp.OperatorInfo;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.ipc.*;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.Service;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import com.datatorrent.stram.api.StramEvent.ContainerErrorEvent;
+import com.datatorrent.stram.api.StramEvent.OperatorErrorEvent;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
+import com.datatorrent.stram.util.SecureExecutor;
+import com.datatorrent.stram.webapp.OperatorInfo;
 
 /**
  *
@@ -44,7 +49,8 @@ import org.slf4j.LoggerFactory;
  *
  * @since 0.3.2
  */
-public class StreamingContainerParent extends org.apache.hadoop.service.CompositeService implements StreamingContainerUmbilicalProtocol {
+public class StreamingContainerParent extends org.apache.hadoop.service.CompositeService implements StreamingContainerUmbilicalProtocol
+{
 
   private static final Logger LOG = LoggerFactory.getLogger(StreamingContainerParent.class);
   private Server server;
@@ -53,7 +59,8 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit
   private final StreamingContainerManager dagManager;
   private final int listenerThreadCount;
 
-  public StreamingContainerParent(String name, StreamingContainerManager dnodeMgr, SecretManager<? extends TokenIdentifier> secretManager, int listenerThreadCount) {
+  public StreamingContainerParent(String name, StreamingContainerManager dnodeMgr, SecretManager<? extends TokenIdentifier> secretManager, int listenerThreadCount)
+  {
     super(name);
     this.dagManager = dnodeMgr;
     this.tokenSecretManager = secretManager;
@@ -61,42 +68,49 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit
   }
 
   @Override
-  public void init(Configuration conf) {
-   super.init(conf);
+  public void init(Configuration conf)
+  {
+    super.init(conf);
   }
 
   @Override
-  public void start() {
+  public void start()
+  {
     startRpcServer();
     super.start();
   }
 
   @Override
-  public void stop() {
+  public void stop()
+  {
     stopRpcServer();
     super.stop();
   }
 
-  protected void startRpcServer() {
+  protected void startRpcServer()
+  {
     Configuration conf = getConfig();
     LOG.info("Config: " + conf);
     LOG.info("Listener thread count " + listenerThreadCount);
     try {
       server = new RPC.Builder(conf).setProtocol(StreamingContainerUmbilicalProtocol.class).setInstance(this)
-          .setBindAddress("0.0.0.0").setPort(0).setNumHandlers(listenerThreadCount).setSecretManager(tokenSecretManager).setVerbose(false).build();
+          .setBindAddress("0.0.0.0").setPort(0).setNumHandlers(listenerThreadCount).setSecretManager(tokenSecretManager)
+          .setVerbose(false).build();
 
       // Enable service authorization?
       if (conf.getBoolean(
           CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
           false)) {
         //refreshServiceAcls(conf, new MRAMPolicyProvider());
-        server.refreshServiceAcl(conf, new PolicyProvider() {
+        server.refreshServiceAcl(conf, new PolicyProvider()
+        {
 
           @Override
           public Service[] getServices()
           {
-            return (new Service[] {
-              new Service(StreamingContainerUmbilicalProtocol.class.getName(), StreamingContainerUmbilicalProtocol.class)
+            return (new Service[]{
+                new Service(StreamingContainerUmbilicalProtocol.class
+                    .getName(), StreamingContainerUmbilicalProtocol.class)
             });
           }
 
@@ -111,11 +125,13 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit
     }
   }
 
-  protected void stopRpcServer() {
+  protected void stopRpcServer()
+  {
     server.stop();
   }
 
-  public InetSocketAddress getAddress() {
+  public InetSocketAddress getAddress()
+  {
     return address;
   }
 
@@ -126,24 +142,27 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit
   }
 
   void refreshServiceAcls(Configuration configuration,
-      PolicyProvider policyProvider) {
+      PolicyProvider policyProvider)
+  {
     this.server.refreshServiceAcl(configuration, policyProvider);
   }
 
   @Override
   public ProtocolSignature getProtocolSignature(String protocol,
-      long clientVersion, int clientMethodsHash) throws IOException {
-      return ProtocolSignature.getProtocolSignature(this,
-          protocol, clientVersion, clientMethodsHash);
+      long clientVersion, int clientMethodsHash) throws IOException
+  {
+    return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash);
   }
 
   @Override
-  public long getProtocolVersion(String arg0, long arg1) throws IOException {
+  public long getProtocolVersion(String arg0, long arg1) throws IOException
+  {
     return StreamingContainerUmbilicalProtocol.versionID;
   }
 
   @Override
-  public void log(String containerId, String msg) throws IOException {
+  public void log(String containerId, String msg) throws IOException
+  {
     LOG.info("child msg: {} context: {}", msg, dagManager.getContainerAgent(containerId).container);
   }
 
@@ -152,8 +171,7 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit
   {
     if (operators == null || operators.length == 0) {
       dagManager.recordEventAsync(new ContainerErrorEvent(containerId, msg));
-    }
-    else {
+    } else {
       for (int operator : operators) {
         OperatorInfo operatorInfo = dagManager.getOperatorInfo(operator);
         if (operatorInfo != null) {
@@ -163,22 +181,23 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit
     }
     try {
       log(containerId, msg);
-    }
-    catch (IOException ex) {
+    } catch (IOException ex) {
       // ignore
     }
   }
 
   @Override
   public StreamingContainerContext getInitContext(String containerId)
-      throws IOException {
+      throws IOException
+  {
     StreamingContainerAgent sca = dagManager.getContainerAgent(containerId);
 
     return sca.getInitContext();
   }
 
   @Override
-  public ContainerHeartbeatResponse processHeartbeat(ContainerHeartbeat msg) {
+  public ContainerHeartbeatResponse processHeartbeat(ContainerHeartbeat msg)
+  {
     // -- TODO
     // Change to use some sort of a annotation that developers can use to specify secure code
     // For now using SecureExecutor work load. Also change sig to throw Exception
@@ -186,11 +205,13 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit
     if (msg.sentTms - now > 50) {
       LOG.warn("Child container heartbeat sent time for {} ({}) is greater than the receive timestamp in AM ({}). Please make sure the clocks are in sync", msg.getContainerId(), msg.sentTms, now);
     }
-    //LOG.debug("RPC latency from child container {} is {} ms (according to system clocks)", msg.getContainerId(), now - msg.sentTms);
+    //LOG.debug("RPC latency from child container {} is {} ms (according to system clocks)", msg.getContainerId(),
+    // now - msg.sentTms);
     dagManager.updateRPCLatency(msg.getContainerId(), now - msg.sentTms);
     try {
       final ContainerHeartbeat fmsg = msg;
-      return SecureExecutor.execute(new SecureExecutor.WorkLoad<ContainerHeartbeatResponse>() {
+      return SecureExecutor.execute(new SecureExecutor.WorkLoad<ContainerHeartbeatResponse>()
+      {
         @Override
         public ContainerHeartbeatResponse run()
         {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/StreamingContainerSecurityInfo.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerSecurityInfo.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerSecurityInfo.java
index 5e30887..60c0d8a 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerSecurityInfo.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerSecurityInfo.java
@@ -49,10 +49,9 @@ public class StreamingContainerSecurityInfo extends SecurityInfo
   public TokenInfo getTokenInfo(Class<?> type, Configuration c)
   {
     TokenInfo tokenInfo = null;
-    if (type.equals(StreamingContainerUmbilicalProtocol.class))
-    {
-        tokenInfo = new TokenInfo() {
-
+    if (type.equals(StreamingContainerUmbilicalProtocol.class)) {
+      tokenInfo = new TokenInfo()
+      {
         @Override
         public Class<? extends TokenSelector<? extends TokenIdentifier>> value()
         {
@@ -64,7 +63,6 @@ public class StreamingContainerSecurityInfo extends SecurityInfo
         {
           return null;
         }
-
       };
     }
     return tokenInfo;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/StringCodecs.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StringCodecs.java b/engine/src/main/java/com/datatorrent/stram/StringCodecs.java
index 2c0a0bc..8bd5ed5 100644
--- a/engine/src/main/java/com/datatorrent/stram/StringCodecs.java
+++ b/engine/src/main/java/com/datatorrent/stram/StringCodecs.java
@@ -19,9 +19,9 @@
 package com.datatorrent.stram;
 
 import java.net.URI;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,8 +29,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.commons.beanutils.ConvertUtils;
 import org.apache.commons.beanutils.Converter;
 
+import com.google.common.base.Throwables;
+
 import com.datatorrent.api.StringCodec;
-import com.datatorrent.netlet.util.DTThrowable;
 
 /**
  * <p>StringCodecs class.</p>
@@ -39,8 +40,8 @@ import com.datatorrent.netlet.util.DTThrowable;
  */
 public class StringCodecs
 {
-  private static final ConcurrentHashMap<ClassLoader, Boolean> classLoaders = new ConcurrentHashMap<ClassLoader, Boolean>();
-  private static final Map<Class<?>, Class<? extends StringCodec<?>>> codecs = new HashMap<Class<?>, Class<? extends StringCodec<?>>>();
+  private static final ConcurrentHashMap<ClassLoader, Boolean> classLoaders = new ConcurrentHashMap<>();
+  private static final Map<Class<?>, Class<? extends StringCodec<?>>> codecs = new HashMap<>();
   private static final Logger LOG = LoggerFactory.getLogger(StringCodecs.class);
 
   private StringCodecs()
@@ -69,11 +70,9 @@ public class StringCodecs
           StringCodec instance;
           try {
             instance = codec.newInstance();
-          }
-          catch (IllegalAccessException ex) {
+          } catch (IllegalAccessException ex) {
             throw new RuntimeException("Internal Error - it's impossible for this exception to be thrown!", ex);
-          }
-          catch (InstantiationException ex) {
+          } catch (InstantiationException ex) {
             throw new RuntimeException("Internal Error - it's impossible for this exception to be thrown!", ex);
           }
 
@@ -113,7 +112,7 @@ public class StringCodecs
       try {
         register(entry.getValue(), entry.getKey());
       } catch (Exception ex) {
-        DTThrowable.rethrow(ex);
+        throw Throwables.propagate(ex);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/api/ContainerContext.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/ContainerContext.java b/engine/src/main/java/com/datatorrent/stram/api/ContainerContext.java
index 5f13797..605944e 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/ContainerContext.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/ContainerContext.java
@@ -29,10 +29,10 @@ import com.datatorrent.api.Context;
  */
 public interface ContainerContext extends Context
 {
-  public static final Attribute<String> IDENTIFIER = new Attribute<String>("unknown_container_id");
-  public static final Attribute<Integer> BUFFER_SERVER_MB = new Attribute<Integer>(8*64);
-  public static final Attribute<byte[]> BUFFER_SERVER_TOKEN = new Attribute<byte[]>(null, null);
-  public static final Attribute<RequestFactory> REQUEST_FACTORY = new Attribute<RequestFactory>(null, null);
+  public static final Attribute<String> IDENTIFIER = new Attribute<>("unknown_container_id");
+  public static final Attribute<Integer> BUFFER_SERVER_MB = new Attribute<>(8 * 64);
+  public static final Attribute<byte[]> BUFFER_SERVER_TOKEN = new Attribute<>(null, null);
+  public static final Attribute<RequestFactory> REQUEST_FACTORY = new Attribute<>(null, null);
   @SuppressWarnings("FieldNameHidesFieldInSuperclass")
   long serialVersionUID = AttributeInitializer.initialize(ContainerContext.class);
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/api/ContainerEvent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/ContainerEvent.java b/engine/src/main/java/com/datatorrent/stram/api/ContainerEvent.java
index d200803..52f0959 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/ContainerEvent.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/ContainerEvent.java
@@ -32,15 +32,15 @@ import com.datatorrent.stram.engine.StreamContext;
  */
 public interface ContainerEvent
 {
-  Class<?> CONTAINER_EVENTS_LISTENERS[] = new Class<?>[] {
-    com.datatorrent.stram.engine.BufferServerStatsSubscriber.class,
-    com.datatorrent.stram.debug.TupleRecorderCollection.class
+  Class<?>[] CONTAINER_EVENTS_LISTENERS = new Class<?>[]{
+      com.datatorrent.stram.engine.BufferServerStatsSubscriber.class,
+      com.datatorrent.stram.debug.TupleRecorderCollection.class
   };
 
   /**
    * Node event used for various events associated with nodes.
    */
-  public static abstract class NodeEvent implements ContainerEvent
+  abstract class NodeEvent implements ContainerEvent
   {
     private Node<?> node;
 
@@ -59,7 +59,7 @@ public interface ContainerEvent
   /**
    * Event representing node activation.
    */
-  public static class NodeActivationEvent extends NodeEvent
+  class NodeActivationEvent extends NodeEvent
   {
     public NodeActivationEvent(Node<?> node)
     {
@@ -71,7 +71,7 @@ public interface ContainerEvent
   /**
    * Event representing node deactivation.
    */
-  public static class NodeDeactivationEvent extends NodeEvent
+  class NodeDeactivationEvent extends NodeEvent
   {
     public NodeDeactivationEvent(Node<?> node)
     {
@@ -83,7 +83,7 @@ public interface ContainerEvent
   /**
    * Event representing stats in the container from the ContainerStats object.
    */
-  public static class ContainerStatsEvent implements ContainerEvent
+  class ContainerStatsEvent implements ContainerEvent
   {
     private ContainerStats containerStats;
 
@@ -102,7 +102,7 @@ public interface ContainerEvent
   /**
    * Event representing streams.
    */
-  public static abstract class StreamEvent implements ContainerEvent
+  abstract class StreamEvent implements ContainerEvent
   {
     private ComponentContextPair<Stream, StreamContext> stream;
 
@@ -121,7 +121,7 @@ public interface ContainerEvent
   /**
    * Event representing stream activation.
    */
-  public static class StreamActivationEvent extends StreamEvent
+  class StreamActivationEvent extends StreamEvent
   {
     public StreamActivationEvent(ComponentContextPair<Stream, StreamContext> stream)
     {
@@ -133,7 +133,7 @@ public interface ContainerEvent
   /**
    * Event representing stream deactivation.
    */
-  public static class StreamDeactivationEvent extends StreamEvent
+  class StreamDeactivationEvent extends StreamEvent
   {
     public StreamDeactivationEvent(ComponentContextPair<Stream, StreamContext> stream)
     {



Mime
View raw message