apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [08/16] incubator-apex-core git commit: APEXCORE-423 style fix for apex-core engine
Date Sat, 09 Apr 2016 03:52:26 GMT
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index aabd9ad..53e4432 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -20,34 +20,55 @@ package com.datatorrent.stram.plan.physical;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.*;
-import java.util.Map.Entry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang.StringUtils;
+
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import com.datatorrent.api.AffinityRule;
 import com.datatorrent.api.AffinityRule.Type;
+import com.datatorrent.api.AffinityRulesSet;
+import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Context.PortContext;
-import com.datatorrent.api.*;
 import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.api.Partitioner;
 import com.datatorrent.api.Partitioner.Partition;
 import com.datatorrent.api.Partitioner.PartitionKeys;
+import com.datatorrent.api.StatsListener;
 import com.datatorrent.api.StatsListener.OperatorRequest;
+import com.datatorrent.api.StorageAgent;
+import com.datatorrent.api.StreamCodec;
 import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.common.util.AsyncFSStorageAgent;
-import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.stram.Journal.Recoverable;
 import com.datatorrent.stram.api.Checkpoint;
 import com.datatorrent.stram.api.StramEvent;
@@ -85,7 +106,8 @@ public class PhysicalPlan implements Serializable
   private static final long serialVersionUID = 201312112033L;
   private static final Logger LOG = LoggerFactory.getLogger(PhysicalPlan.class);
 
-  public static class LoadIndicator {
+  public static class LoadIndicator
+  {
     public final int indicator;
     public final String note;
 
@@ -98,10 +120,10 @@ public class PhysicalPlan implements Serializable
 
   private final AtomicInteger idSequence = new AtomicInteger();
   final AtomicInteger containerSeq = new AtomicInteger();
-  private LinkedHashMap<OperatorMeta, PMapping> logicalToPTOperator = new LinkedHashMap<OperatorMeta, PMapping>();
-  private final List<PTContainer> containers = new CopyOnWriteArrayList<PTContainer>();
+  private LinkedHashMap<OperatorMeta, PMapping> logicalToPTOperator = new LinkedHashMap<>();
+  private final List<PTContainer> containers = new CopyOnWriteArrayList<>();
   private final LogicalPlan dag;
-  private transient final PlanContext ctx;
+  private final transient PlanContext ctx;
   private int maxContainers = 1;
   private int availableMemoryMB = Integer.MAX_VALUE;
   private final LocalityPrefs localityPrefs = new LocalityPrefs();
@@ -116,12 +138,13 @@ public class PhysicalPlan implements Serializable
   private final AtomicInteger strCodecIdSequence = new AtomicInteger();
   private final Map<StreamCodec<?>, Integer> streamCodecIdentifiers = Maps.newHashMap();
 
-  private PTContainer getContainer(int index) {
+  private PTContainer getContainer(int index)
+  {
     if (index >= containers.size()) {
       if (index >= maxContainers) {
         index = maxContainers - 1;
       }
-      for (int i=containers.size(); i<index+1; i++) {
+      for (int i = containers.size(); i < index + 1; i++) {
         containers.add(i, new PTContainer(this));
       }
     }
@@ -131,7 +154,8 @@ public class PhysicalPlan implements Serializable
   /**
    * Interface to execution context that can be mocked for plan testing.
    */
-  public interface PlanContext {
+  public interface PlanContext
+  {
 
     /**
      * Record an event in the event log
@@ -139,7 +163,7 @@ public class PhysicalPlan implements Serializable
      * @param ev The event
      *
      */
-    public void recordEventAsync(StramEvent ev);
+    void recordEventAsync(StramEvent ev);
 
     /**
      * Request deployment change as sequence of undeploy, container start and deploy groups with dependency.
@@ -149,29 +173,30 @@ public class PhysicalPlan implements Serializable
      * @param startContainers
      * @param deploy
      */
-    public void deploy(Set<PTContainer> releaseContainers, Collection<PTOperator> undeploy, Set<PTContainer> startContainers, Collection<PTOperator> deploy);
+    void deploy(Set<PTContainer> releaseContainers, Collection<PTOperator> undeploy, Set<PTContainer> startContainers, Collection<PTOperator> deploy);
 
     /**
      * Trigger event to perform plan modification.
      * @param r
      */
-    public void dispatch(Runnable r);
+    void dispatch(Runnable r);
 
     /**
      * Write the recoverable operation to the log.
      * @param operation
      */
-    public void writeJournal(Recoverable operation);
+    void writeJournal(Recoverable operation);
 
-    public void addOperatorRequest(PTOperator oper, StramToNodeRequest request);
+    void addOperatorRequest(PTOperator oper, StramToNodeRequest request);
   }
 
   private static class StatsListenerProxy implements StatsListener, Serializable
   {
     private static final long serialVersionUID = 201312112033L;
-    final private OperatorMeta om;
+    private final OperatorMeta om;
 
-    private StatsListenerProxy(OperatorMeta om) {
+    private StatsListenerProxy(OperatorMeta om)
+    {
       this.om = om;
     }
 
@@ -189,8 +214,8 @@ public class PhysicalPlan implements Serializable
   {
     private static final long serialVersionUID = 201312112033L;
 
-    final private OperatorMeta logicalOperator;
-    private List<PTOperator> partitions = new LinkedList<PTOperator>();
+    private final OperatorMeta logicalOperator;
+    private List<PTOperator> partitions = new LinkedList<>();
     private final Map<LogicalPlan.OutputPortMeta, StreamMapping> outputStreams = Maps.newHashMap();
     private List<StatsListener> statsHandlers;
 
@@ -199,11 +224,13 @@ public class PhysicalPlan implements Serializable
      */
     private Set<OperatorMeta> parallelPartitions = Sets.newHashSet();
 
-    private PMapping(OperatorMeta om) {
+    private PMapping(OperatorMeta om)
+    {
       this.logicalOperator = om;
     }
 
-    private void addPartition(PTOperator p) {
+    private void addPartition(PTOperator p)
+    {
       partitions.add(p);
       p.statsListeners = this.statsHandlers;
     }
@@ -212,8 +239,9 @@ public class PhysicalPlan implements Serializable
      * Return all partitions and unifiers, except MxN unifiers
      * @return
      */
-    private Collection<PTOperator> getAllOperators() {
-      Collection<PTOperator> c = new ArrayList<PTOperator>(partitions.size() + 1);
+    private Collection<PTOperator> getAllOperators()
+    {
+      Collection<PTOperator> c = new ArrayList<>(partitions.size() + 1);
       c.addAll(partitions);
       for (StreamMapping ug : outputStreams.values()) {
         ug.addTo(c);
@@ -222,7 +250,8 @@ public class PhysicalPlan implements Serializable
     }
 
     @Override
-    public String toString() {
+    public String toString()
+    {
       return logicalOperator.toString();
     }
   }
@@ -245,7 +274,8 @@ public class PhysicalPlan implements Serializable
     private final Map<PMapping, LocalityPref> prefs = Maps.newHashMap();
     private final AtomicInteger groupSeq = new AtomicInteger();
 
-    void add(PMapping m, String group) {
+    void add(PMapping m, String group)
+    {
       if (group != null) {
         LocalityPref pref = null;
         for (LocalityPref lp : prefs.values()) {
@@ -266,7 +296,8 @@ public class PhysicalPlan implements Serializable
 
     // if netbeans is not smart, don't produce warnings in other IDE
     //@SuppressWarnings("null") /* for lp2.operators.add(m1); line below - netbeans is not very smart; you don't be an idiot! */
-    void setLocal(PMapping m1, PMapping m2) {
+    void setLocal(PMapping m1, PMapping m2)
+    {
       LocalityPref lp1 = prefs.get(m1);
       LocalityPref lp2 = prefs.get(m2);
 
@@ -304,14 +335,15 @@ public class PhysicalPlan implements Serializable
    * @param dag
    * @param ctx
    */
-  public PhysicalPlan(LogicalPlan dag, PlanContext ctx) {
+  public PhysicalPlan(LogicalPlan dag, PlanContext ctx)
+  {
 
     this.dag = dag;
     this.ctx = ctx;
     this.maxContainers = Math.max(dag.getMaxContainerCount(), 1);
     LOG.debug("Max containers: {}", this.maxContainers);
 
-    Stack<OperatorMeta> pendingNodes = new Stack<OperatorMeta>();
+    Stack<OperatorMeta> pendingNodes = new Stack<>();
 
     // Add logging operators for streams if not added already
     updatePersistOperatorStreamCodec(dag);
@@ -484,8 +516,9 @@ public class PhysicalPlan implements Serializable
               PTContainer firstContainer = operatorContainerMap.get(firstPtOperator);
               for (PTOperator secondPtOperator : secondMapping.partitions) {
                 PTContainer secondContainer = operatorContainerMap.get(secondPtOperator);
-                if (firstContainer == secondContainer || firstContainer.getStrictAntiPrefs().contains(secondContainer))
+                if (firstContainer == secondContainer || firstContainer.getStrictAntiPrefs().contains(secondContainer)) {
                   continue;
+                }
                 if (rule.isRelaxLocality()) {
                   firstContainer.getPreferredAntiPrefs().add(secondContainer);
                   secondContainer.getPreferredAntiPrefs().add(firstContainer);
@@ -510,9 +543,10 @@ public class PhysicalPlan implements Serializable
         for (StreamMeta s : n.getOutputStreams().values()) {
           if (s.getPersistOperator() != null) {
             InputPortMeta persistInputPort = s.getPersistOperatorInputPort();
-            StreamCodecWrapperForPersistance<?> persistCodec = (StreamCodecWrapperForPersistance<?>) persistInputPort.getAttributes().get(PortContext.STREAM_CODEC);
-            if (persistCodec == null)
+            StreamCodecWrapperForPersistance<?> persistCodec = (StreamCodecWrapperForPersistance<?>)persistInputPort.getAttributes().get(PortContext.STREAM_CODEC);
+            if (persistCodec == null) {
               continue;
+            }
             // Logging is enabled for the stream
             for (InputPortMeta portMeta : s.getSinksToPersist()) {
               updatePersistOperatorWithSinkPartitions(persistInputPort, s.getPersistOperator(), persistCodec, portMeta);
@@ -520,12 +554,12 @@ public class PhysicalPlan implements Serializable
           }
 
           // Check partitioning for persist operators per sink too
-          for (Entry<InputPortMeta, InputPortMeta> entry : s.sinkSpecificPersistInputPortMap.entrySet()) {
+          for (Map.Entry<InputPortMeta, InputPortMeta> entry : s.sinkSpecificPersistInputPortMap.entrySet()) {
             InputPortMeta persistInputPort = entry.getValue();
             StreamCodec<?> codec = persistInputPort.getAttributes().get(PortContext.STREAM_CODEC);
             if (codec != null) {
               if (codec instanceof StreamCodecWrapperForPersistance) {
-                StreamCodecWrapperForPersistance<?> persistCodec = (StreamCodecWrapperForPersistance<?>) codec;
+                StreamCodecWrapperForPersistance<?> persistCodec = (StreamCodecWrapperForPersistance<?>)codec;
                 updatePersistOperatorWithSinkPartitions(persistInputPort, s.sinkSpecificPersistOperatorMap.get(entry.getKey()), persistCodec, entry.getKey());
               }
             }
@@ -533,14 +567,14 @@ public class PhysicalPlan implements Serializable
         }
       }
     } catch (Exception e) {
-      DTThrowable.wrapIfChecked(e);
+      throw Throwables.propagate(e);
     }
   }
 
   private void updatePersistOperatorWithSinkPartitions(InputPortMeta persistInputPort, OperatorMeta persistOperatorMeta, StreamCodecWrapperForPersistance<?> persistCodec, InputPortMeta sinkPortMeta)
   {
     Collection<PTOperator> ptOperators = getOperators(sinkPortMeta.getOperatorWrapper());
-    Collection<PartitionKeys> partitionKeysList = new ArrayList<PartitionKeys>();
+    Collection<PartitionKeys> partitionKeysList = new ArrayList<>();
     for (PTOperator p : ptOperators) {
       PartitionKeys keys = p.partitionKeys.get(sinkPortMeta);
       partitionKeysList.add(keys);
@@ -551,7 +585,7 @@ public class PhysicalPlan implements Serializable
 
   private void updatePersistOperatorStreamCodec(LogicalPlan dag)
   {
-    HashMap<StreamMeta, StreamCodec<?>> streamMetaToCodecMap = new HashMap<StreamMeta, StreamCodec<?>>();
+    HashMap<StreamMeta, StreamCodec<?>> streamMetaToCodecMap = new HashMap<>();
     try {
       for (OperatorMeta n : dag.getAllOperators()) {
         for (StreamMeta s : n.getOutputStreams().values()) {
@@ -598,11 +632,12 @@ public class PhysicalPlan implements Serializable
         dag.setInputPortAttribute(entry.getKey().getPersistOperatorInputPort().getPortObject(), PortContext.STREAM_CODEC, entry.getValue());
       }
     } catch (Exception e) {
-      DTThrowable.wrapIfChecked(e);
+      throw Throwables.propagate(e);
     }
   }
 
-  private void setContainer(PTOperator pOperator, PTContainer container) {
+  private void setContainer(PTOperator pOperator, PTContainer container)
+  {
     LOG.debug("Setting container {} for {}", container, pOperator);
     assert (pOperator.container == null) : "Container already assigned for " + pOperator;
     pOperator.container = container;
@@ -638,9 +673,9 @@ public class PhysicalPlan implements Serializable
   private int getVCores(Collection<PTOperator> operators)
   {
     // this forms the groups of thread local operators in the given container
-    HashMap<PTOperator, Set<PTOperator>> groupMap = new HashMap<PTOperator, Set<PTOperator>>();
+    HashMap<PTOperator, Set<PTOperator>> groupMap = new HashMap<>();
     for (PTOperator operator : operators) {
-      Set<PTOperator> group = new HashSet<PTOperator>();
+      Set<PTOperator> group = new HashSet<>();
       group.add(operator);
       groupMap.put(operator, group);
     }
@@ -657,7 +692,7 @@ public class PhysicalPlan implements Serializable
         }
       }
     }
-    Set<PTOperator> visitedOperators = new HashSet<PTOperator>();
+    Set<PTOperator> visitedOperators = new HashSet<>();
     for (Map.Entry<PTOperator, Set<PTOperator>> group : groupMap.entrySet()) {
       if (!visitedOperators.contains(group.getKey())) {
         visitedOperators.addAll(group.getValue());
@@ -693,7 +728,7 @@ public class PhysicalPlan implements Serializable
     public List<InputPort<?>> getInputPorts()
     {
       if (inputPorts == null) {
-         inputPorts = getInputPortList(om.logicalOperator);
+        inputPorts = getInputPortList(om.logicalOperator);
       }
       return inputPorts;
     }
@@ -706,21 +741,20 @@ public class PhysicalPlan implements Serializable
 
     @SuppressWarnings("unchecked")
     Partitioner<Operator> partitioner = m.logicalOperator.getAttributes().contains(OperatorContext.PARTITIONER)
-                                        ? (Partitioner<Operator>)m.logicalOperator.getValue(OperatorContext.PARTITIONER)
-                                        : operator instanceof Partitioner? (Partitioner<Operator>)operator: null;
+        ? (Partitioner<Operator>)m.logicalOperator.getValue(OperatorContext.PARTITIONER)
+        : operator instanceof Partitioner ? (Partitioner<Operator>)operator : null;
 
-    Collection<Partition<Operator>> collection = new ArrayList<Partition<Operator>>(1);
-    DefaultPartition<Operator> firstPartition = new DefaultPartition<Operator>(operator);
+    Collection<Partition<Operator>> collection = new ArrayList<>(1);
+    DefaultPartition<Operator> firstPartition = new DefaultPartition<>(operator);
     collection.add(firstPartition);
 
     if (partitioner != null) {
       partitions = partitioner.definePartitions(collection, new PartitioningContextImpl(m, partitionCnt));
 
-      if(partitions == null || partitions.isEmpty()) {
+      if (partitions == null || partitions.isEmpty()) {
         throw new IllegalStateException("Partitioner returns null or empty.");
       }
-    }
-    else {
+    } else {
       //This handles the case when parallel partitioning is occurring. Partition count will be
       //Non zero in the case of parallel partitioning.
       for (int partitionCounter = 0; partitionCounter < partitionCnt - 1; partitionCounter++) {
@@ -732,14 +766,14 @@ public class PhysicalPlan implements Serializable
     Collection<StatsListener> statsListeners = m.logicalOperator.getValue(OperatorContext.STATS_LISTENERS);
     if (statsListeners != null && !statsListeners.isEmpty()) {
       if (m.statsHandlers == null) {
-        m.statsHandlers = new ArrayList<StatsListener>(statsListeners.size());
+        m.statsHandlers = new ArrayList<>(statsListeners.size());
       }
       m.statsHandlers.addAll(statsListeners);
     }
 
     if (m.logicalOperator.getOperator() instanceof StatsListener) {
       if (m.statsHandlers == null) {
-        m.statsHandlers = new ArrayList<StatsListener>(1);
+        m.statsHandlers = new ArrayList<>(1);
       }
       m.statsHandlers.add(new StatsListenerProxy(m.logicalOperator));
     }
@@ -762,7 +796,7 @@ public class PhysicalPlan implements Serializable
     final List<DefaultPartition<Operator>> currentPartitions;
     final Map<Partition<?>, PTOperator> currentPartitionMap;
     final Map<Integer, Partition<Operator>> operatorIdToPartition;
-    final List<Partition<Operator>> addedPartitions = new ArrayList<Partition<Operator>>();
+    final List<Partition<Operator>> addedPartitions = new ArrayList<>();
     Checkpoint minCheckpoint = null;
     Collection<Partition<Operator>> newPartitions = null;
 
@@ -770,7 +804,7 @@ public class PhysicalPlan implements Serializable
     {
       super(currentMapping, partitionCount);
       this.operators = currentMapping.partitions;
-      this.currentPartitions = new ArrayList<DefaultPartition<Operator>>(operators.size());
+      this.currentPartitions = new ArrayList<>(operators.size());
       this.currentPartitionMap = Maps.newHashMapWithExpectedSize(operators.size());
       this.operatorIdToPartition = Maps.newHashMapWithExpectedSize(operators.size());
 
@@ -786,13 +820,12 @@ public class PhysicalPlan implements Serializable
         // partitions will start from earliest checkpoint found (at least once semantics)
         if (minCheckpoint == null) {
           minCheckpoint = pOperator.recoveryCheckpoint;
-        }
-        else if (minCheckpoint.windowId > pOperator.recoveryCheckpoint.windowId) {
+        } else if (minCheckpoint.windowId > pOperator.recoveryCheckpoint.windowId) {
           minCheckpoint = pOperator.recoveryCheckpoint;
         }
 
         Operator partitionedOperator = loadOperator(pOperator);
-        DefaultPartition<Operator> partition = new DefaultPartition<Operator>(partitionedOperator, pks, pOperator.loadIndicator, pOperator.stats);
+        DefaultPartition<Operator> partition = new DefaultPartition<>(partitionedOperator, pks, pOperator.loadIndicator, pOperator.stats);
         currentPartitions.add(partition);
         currentPartitionMap.put(partition, pOperator);
         LOG.debug("partition load: {} {} {}", pOperator, partition.getPartitionKeys(), partition.getLoad());
@@ -812,8 +845,7 @@ public class PhysicalPlan implements Serializable
       @SuppressWarnings("unchecked")
       Partitioner<Operator> tmp = (Partitioner<Operator>)currentMapping.logicalOperator.getValue(OperatorContext.PARTITIONER);
       partitioner = tmp;
-    }
-    else if (operator instanceof Partitioner) {
+    } else if (operator instanceof Partitioner) {
       @SuppressWarnings("unchecked")
       Partitioner<Operator> tmp = (Partitioner<Operator>)operator;
       partitioner = tmp;
@@ -855,7 +887,7 @@ public class PhysicalPlan implements Serializable
       return;
     }
 
-    List<Partition<Operator>> addedPartitions = new ArrayList<Partition<Operator>>();
+    List<Partition<Operator>> addedPartitions = new ArrayList<>();
     // determine modifications of partition set, identify affected operator instance(s)
     for (Partition<Operator> newPartition : mainPC.newPartitions) {
       PTOperator op = mainPC.currentPartitionMap.remove(newPartition);
@@ -884,7 +916,7 @@ public class PhysicalPlan implements Serializable
 
     // process parallel partitions before removing operators from the plan
     LinkedHashMap<PMapping, RepartitionContext> partitionContexts = Maps.newLinkedHashMap();
-    Stack<OperatorMeta> parallelPartitions = new Stack<LogicalPlan.OperatorMeta>();
+    Stack<OperatorMeta> parallelPartitions = new Stack<>();
     parallelPartitions.addAll(currentMapping.parallelPartitions);
     pendingLoop:
     while (!parallelPartitions.isEmpty()) {
@@ -934,7 +966,7 @@ public class PhysicalPlan implements Serializable
     for (Map.Entry<PMapping, RepartitionContext> e : partitionContexts.entrySet()) {
       if (e.getValue() == null) {
         // no partitioner, add required operators
-        for (int i=0; i<addedPartitions.size(); i++) {
+        for (int i = 0; i < addedPartitions.size(); i++) {
           LOG.debug("Automatically adding to parallel partition {}", e.getKey());
           // set activation windowId to confirm to upstream checkpoints
           addPTOperator(e.getKey(), null, mainPC.minCheckpoint);
@@ -943,7 +975,7 @@ public class PhysicalPlan implements Serializable
         RepartitionContext pc = e.getValue();
         // track previous parallel partition mapping
         Map<Partition<Operator>, Partition<Operator>> prevMapping = Maps.newHashMap();
-        for (int i=0; i<mainPC.currentPartitions.size(); i++) {
+        for (int i = 0; i < mainPC.currentPartitions.size(); i++) {
           prevMapping.put(pc.currentPartitions.get(i), mainPC.currentPartitions.get(i));
         }
         // determine which new partitions match upstream, remaining to be treated as new operator
@@ -1037,14 +1069,13 @@ public class PhysicalPlan implements Serializable
         OperatorMeta sourceOM = sourceMapping.logicalOperator;
         if (sourceOM.getAttributes().contains(Context.OperatorContext.SLIDE_BY_WINDOW_COUNT)) {
           if (sourceOM.getValue(Context.OperatorContext.SLIDE_BY_WINDOW_COUNT) <
-            sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)) {
+              sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)) {
             slidingWindowCount = sourceOM.getValue(OperatorContext.SLIDE_BY_WINDOW_COUNT);
-          }
-          else {
+          } else {
             LOG.warn("Sliding Window Count {} should be less than APPLICATION WINDOW COUNT {}", sourceOM.getValue(Context.OperatorContext.SLIDE_BY_WINDOW_COUNT), sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT));
           }
         }
-        for (int i=0; i<m.partitions.size(); i++) {
+        for (int i = 0; i < m.partitions.size(); i++) {
           PTOperator oper = m.partitions.get(i);
           PTOperator sourceOper = sourceMapping.partitions.get(i);
           for (PTOutput sourceOut : sourceOper.outputs) {
@@ -1061,12 +1092,11 @@ public class PhysicalPlan implements Serializable
               PTInput input;
               if (slidingWindowCount > 0) {
                 PTOperator slidingUnifier = StreamMapping.createSlidingUnifier(sourceOut.logicalStream, this,
-                  sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT), slidingWindowCount);
+                    sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT), slidingWindowCount);
                 StreamMapping.addInput(slidingUnifier, sourceOut, null);
                 input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0), ipm.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
                 sourceMapping.outputStreams.get(ipm.getValue().getSource()).slidingUnifiers.add(slidingUnifier);
-              }
-              else {
+              } else {
                 input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut, ipm.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
               }
               oper.inputs.add(input);
@@ -1086,7 +1116,8 @@ public class PhysicalPlan implements Serializable
 
   }
 
-  public void deployChanges() {
+  public void deployChanges()
+  {
     Set<PTContainer> newContainers = Sets.newHashSet();
     Set<PTContainer> releaseContainers = Sets.newHashSet();
     assignContainers(newContainers, releaseContainers);
@@ -1193,7 +1224,7 @@ public class PhysicalPlan implements Serializable
       agent.save(oo, oper.id, windowId);
       if (agent instanceof AsyncFSStorageAgent) {
         AsyncFSStorageAgent asyncFSStorageAgent = (AsyncFSStorageAgent)agent;
-        if(!asyncFSStorageAgent.isSyncCheckpoint()) {
+        if (!asyncFSStorageAgent.isSyncCheckpoint()) {
           asyncFSStorageAgent.copyToHDFS(oper.id, windowId);
         }
       }
@@ -1207,7 +1238,8 @@ public class PhysicalPlan implements Serializable
     }
   }
 
-  public Operator loadOperator(PTOperator oper) {
+  public Operator loadOperator(PTOperator oper)
+  {
     try {
       LOG.debug("Loading state for {}", oper);
       return (Operator)oper.operatorMeta.getValue(OperatorContext.STORAGE_AGENT).load(oper.id, oper.isOperatorStateLess() ? Stateless.WINDOW_ID : oper.recoveryCheckpoint.windowId);
@@ -1298,7 +1330,8 @@ public class PhysicalPlan implements Serializable
     removePTOperator(oper);
   }
 
-  private PTOperator addPTOperator(PMapping nodeDecl, Partition<? extends Operator> partition, Checkpoint checkpoint) {
+  private PTOperator addPTOperator(PMapping nodeDecl, Partition<? extends Operator> partition, Checkpoint checkpoint)
+  {
     PTOperator oper = newOperator(nodeDecl.logicalOperator, nodeDecl.logicalOperator.getName());
     oper.recoveryCheckpoint = checkpoint;
 
@@ -1349,21 +1382,22 @@ public class PhysicalPlan implements Serializable
     oper.outputs.add(out);
   }
 
-  PTOperator newOperator(OperatorMeta om, String name) {
+  PTOperator newOperator(OperatorMeta om, String name)
+  {
     PTOperator oper = new PTOperator(this, idSequence.incrementAndGet(), name, om);
     allOperators.put(oper.id, oper);
-    oper.inputs = new ArrayList<PTInput>();
-    oper.outputs = new ArrayList<PTOutput>();
+    oper.inputs = new ArrayList<>();
+    oper.outputs = new ArrayList<>();
 
     this.ctx.recordEventAsync(new StramEvent.CreateOperatorEvent(oper.getName(), oper.getId()));
 
     return oper;
   }
 
-  private void setLocalityGrouping(PMapping pnodes, PTOperator newOperator, LocalityPrefs localityPrefs, Locality ltype,String host) {
-
+  private void setLocalityGrouping(PMapping pnodes, PTOperator newOperator, LocalityPrefs localityPrefs, Locality ltype,String host)
+  {
     HostOperatorSet grpObj = newOperator.getGrouping(ltype);
-    if(host!= null) {
+    if (host != null) {
       grpObj.setHost(host);
     }
     Set<PTOperator> s = grpObj.getOperatorSet();
@@ -1374,7 +1408,7 @@ public class PhysicalPlan implements Serializable
         if (pnodes.parallelPartitions == localPM.parallelPartitions) {
           if (localPM.partitions.size() >= pnodes.partitions.size()) {
             // apply locality setting per partition
-            s.addAll(localPM.partitions.get(pnodes.partitions.size()-1).getGrouping(ltype).getOperatorSet());
+            s.addAll(localPM.partitions.get(pnodes.partitions.size() - 1).getGrouping(ltype).getOperatorSet());
           }
         } else {
           for (PTOperator otherNode : localPM.partitions) {
@@ -1383,11 +1417,10 @@ public class PhysicalPlan implements Serializable
         }
       }
       for (PTOperator localOper : s) {
-        if(grpObj.getHost() == null){
+        if (grpObj.getHost() == null) {
           grpObj.setHost(localOper.groupings.get(ltype).getHost());
-         }
+        }
         localOper.groupings.put(ltype, grpObj);
-
       }
     }
   }
@@ -1453,19 +1486,23 @@ public class PhysicalPlan implements Serializable
     this.ctx.recordEventAsync(new StramEvent.RemoveOperatorEvent(oper.getName(), oper.getId()));
   }
 
-  public PlanContext getContext() {
+  public PlanContext getContext()
+  {
     return ctx;
   }
 
-  public LogicalPlan getLogicalPlan() {
+  public LogicalPlan getLogicalPlan()
+  {
     return this.dag;
   }
 
-  public List<PTContainer> getContainers() {
+  public List<PTContainer> getContainers()
+  {
     return this.containers;
   }
 
-  public Map<Integer, PTOperator> getAllOperators() {
+  public Map<Integer, PTOperator> getAllOperators()
+  {
     return this.allOperators;
   }
 
@@ -1475,7 +1512,8 @@ public class PhysicalPlan implements Serializable
    * @param logicalOperator
    * @return
    */
-  public List<PTOperator> getOperators(OperatorMeta logicalOperator) {
+  public List<PTOperator> getOperators(OperatorMeta logicalOperator)
+  {
     return this.logicalToPTOperator.get(logicalOperator).partitions;
   }
 
@@ -1493,13 +1531,15 @@ public class PhysicalPlan implements Serializable
     return operators;
   }
 
-  public boolean hasMapping(OperatorMeta om) {
+  public boolean hasMapping(OperatorMeta om)
+  {
     return this.logicalToPTOperator.containsKey(om);
   }
 
   // used for testing only
   @VisibleForTesting
-  public List<PTOperator> getMergeOperators(OperatorMeta logicalOperator) {
+  public List<PTOperator> getMergeOperators(OperatorMeta logicalOperator)
+  {
     List<PTOperator> opers = Lists.newArrayList();
     for (StreamMapping ug : this.logicalToPTOperator.get(logicalOperator).outputStreams.values()) {
       ug.addTo(opers);
@@ -1507,11 +1547,13 @@ public class PhysicalPlan implements Serializable
     return opers;
   }
 
-  protected List<OperatorMeta> getRootOperators() {
+  protected List<OperatorMeta> getRootOperators()
+  {
     return dag.getRootOperators();
   }
 
-  private void getDeps(PTOperator operator, Set<PTOperator> visited) {
+  private void getDeps(PTOperator operator, Set<PTOperator> visited)
+  {
     visited.add(operator);
     for (PTInput in : operator.inputs) {
       if (in.source.isDownStreamInline()) {
@@ -1540,7 +1582,7 @@ public class PhysicalPlan implements Serializable
    */
   public Set<PTOperator> getDependents(Collection<PTOperator> operators)
   {
-    Set<PTOperator> visited = new LinkedHashSet<PTOperator>();
+    Set<PTOperator> visited = new LinkedHashSet<>();
     if (operators != null) {
       for (PTOperator operator: operators) {
         getDeps(operator, visited);
@@ -1552,7 +1594,7 @@ public class PhysicalPlan implements Serializable
 
   private Set<PTOperator> getDependentPersistOperators(Collection<PTOperator> operators)
   {
-    Set<PTOperator> persistOperators = new LinkedHashSet<PTOperator>();
+    Set<PTOperator> persistOperators = new LinkedHashSet<>();
     if (operators != null) {
       for (PTOperator operator : operators) {
         for (PTInput in : operator.inputs) {
@@ -1565,7 +1607,7 @@ public class PhysicalPlan implements Serializable
               }
             }
           }
-          for (Entry<InputPortMeta, OperatorMeta> entry : in.logicalStream.sinkSpecificPersistOperatorMap.entrySet()) {
+          for (Map.Entry<InputPortMeta, OperatorMeta> entry : in.logicalStream.sinkSpecificPersistOperatorMap.entrySet()) {
             // Redeploy sink specific persist operators
             persistOperators.addAll(getOperators(entry.getValue()));
           }
@@ -1755,9 +1797,11 @@ public class PhysicalPlan implements Serializable
           } else {
             LOG.debug("Scheduling repartitioning for {} load {}", oper, oper.loadIndicator);
             // hand over to monitor thread
-            Runnable r = new Runnable() {
+            Runnable r = new Runnable()
+            {
               @Override
-              public void run() {
+              public void run()
+              {
                 redoPartitions(logicalToPTOperator.get(om), rsp.repartitionNote);
                 pendingRepartition.remove(om);
               }
@@ -1775,8 +1819,8 @@ public class PhysicalPlan implements Serializable
           }
         }
         // for backward compatibility
-        if(rsp.operatorCommands != null){
-          for(@SuppressWarnings("deprecation") com.datatorrent.api.StatsListener.OperatorCommand cmd: rsp.operatorCommands){
+        if (rsp.operatorCommands != null) {
+          for (@SuppressWarnings("deprecation") com.datatorrent.api.StatsListener.OperatorCommand cmd : rsp.operatorCommands) {
             StramToNodeRequest request = new StramToNodeRequest();
             request.operatorId = oper.getId();
             request.requestType = StramToNodeRequest.RequestType.CUSTOM;
@@ -1811,7 +1855,8 @@ public class PhysicalPlan implements Serializable
     }
   }
 
-  public Integer getStreamCodecIdentifier(StreamCodec<?> streamCodecInfo) {
+  public Integer getStreamCodecIdentifier(StreamCodec<?> streamCodecInfo)
+  {
     Integer id;
     synchronized (streamCodecIdentifiers) {
       id = streamCodecIdentifiers.get(streamCodecInfo);
@@ -1824,7 +1869,8 @@ public class PhysicalPlan implements Serializable
   }
 
   @VisibleForTesting
-  public Map<StreamCodec<?>, Integer> getStreamCodecIdentifiers() {
+  public Map<StreamCodec<?>, Integer> getStreamCodecIdentifiers()
+  {
     return Collections.unmodifiableMap(streamCodecIdentifiers);
   }
 
@@ -1844,7 +1890,4 @@ public class PhysicalPlan implements Serializable
       return null;
     }
   }
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/plan/physical/PlanModifier.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PlanModifier.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PlanModifier.java
index 98abe9d..165517d 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PlanModifier.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PlanModifier.java
@@ -18,23 +18,23 @@
  */
 package com.datatorrent.stram.plan.physical;
 
-import com.datatorrent.api.Context.DAGContext;
 import java.util.Collections;
 import java.util.Map;
 
 import javax.validation.ValidationException;
 
-import com.datatorrent.stram.plan.logical.LogicalPlan;
-import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
-import com.datatorrent.stram.plan.logical.Operators;
-import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
-import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
-import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
+import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.InputPort;
 import com.datatorrent.api.Operator.OutputPort;
 import com.datatorrent.api.StringCodec;
 import com.datatorrent.stram.StringCodecs;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
+import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
+import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
+import com.datatorrent.stram.plan.logical.Operators;
 import com.datatorrent.stram.plan.logical.Operators.PortContextPair;
 
 /**
@@ -44,10 +44,11 @@ import com.datatorrent.stram.plan.logical.Operators.PortContextPair;
  *
  * @since 0.3.2
  */
-public class PlanModifier {
+public class PlanModifier
+{
 
-  final private LogicalPlan logicalPlan;
-  final private PhysicalPlan physicalPlan;
+  private final LogicalPlan logicalPlan;
+  private final PhysicalPlan physicalPlan;
 
   /**
    * For dry run on logical plan only
@@ -260,8 +261,7 @@ public class PlanModifier {
   public void setOperatorProperty(String operatorName, String propertyName, String propertyValue)
   {
     OperatorMeta om = assertGetOperator(operatorName);
-    if (physicalPlan != null)
-    {
+    if (physicalPlan != null) {
       for (PTOperator oper : physicalPlan.getOperators(om)) {
         if (!physicalPlan.newOpers.containsKey(oper)) {
           throw new ValidationException("Properties can only be set on new operators: " + om + " " + propertyName + " " + propertyValue);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/plan/physical/StatsRevisions.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StatsRevisions.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StatsRevisions.java
index 097fa20..5ff58a0 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StatsRevisions.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StatsRevisions.java
@@ -34,8 +34,8 @@ import org.slf4j.LoggerFactory;
  */
 public class StatsRevisions implements Serializable
 {
-  private final IdentityHashMap<Object, Integer> longsIndex = new IdentityHashMap<Object, Integer>();
-  private transient ThreadLocal<Revision> VERSION = new ThreadLocal<Revision>();
+  private final IdentityHashMap<Object, Integer> longsIndex = new IdentityHashMap<>();
+  private transient ThreadLocal<Revision> VERSION = new ThreadLocal<>();
   private Revision current = new Revision();
 
   public VersionedLong newVersionedLong()

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
index f30ceb6..e404d5a 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
@@ -58,7 +58,7 @@ public class StreamMapping implements java.io.Serializable
 {
   private static final long serialVersionUID = 8572852828117485193L;
 
-  private final static Logger LOG = LoggerFactory.getLogger(StreamMapping.class);
+  private static final Logger LOG = LoggerFactory.getLogger(StreamMapping.class);
 
   private final StreamMeta streamMeta;
   private final PhysicalPlan plan;
@@ -67,13 +67,14 @@ public class StreamMapping implements java.io.Serializable
   final Set<PTOperator> slidingUnifiers = Sets.newHashSet();
   private final List<PTOutput> upstream = Lists.newArrayList();
 
-
-  public StreamMapping(StreamMeta streamMeta, PhysicalPlan plan) {
+  public StreamMapping(StreamMeta streamMeta, PhysicalPlan plan)
+  {
     this.streamMeta = streamMeta;
     this.plan = plan;
   }
 
-  void addTo(Collection<PTOperator> opers) {
+  void addTo(Collection<PTOperator> opers)
+  {
     if (finalUnifier != null) {
       opers.add(finalUnifier);
     }
@@ -81,7 +82,8 @@ public class StreamMapping implements java.io.Serializable
     opers.addAll(slidingUnifiers);
   }
 
-  public void setSources(Collection<PTOperator> partitions) {
+  public void setSources(Collection<PTOperator> partitions)
+  {
     upstream.clear();
     // add existing inputs
     for (PTOperator uoper : partitions) {
@@ -94,10 +96,12 @@ public class StreamMapping implements java.io.Serializable
     redoMapping();
   }
 
-  public static PTOperator createSlidingUnifier(StreamMeta streamMeta, PhysicalPlan plan, int operatorApplicationWindowCount, int slidingWindowCount)
+  public static PTOperator createSlidingUnifier(StreamMeta streamMeta, PhysicalPlan plan, int
+      operatorApplicationWindowCount, int slidingWindowCount)
   {
     int gcd = IntMath.gcd(operatorApplicationWindowCount, slidingWindowCount);
-    OperatorMeta um = streamMeta.getSource().getSlidingUnifier(operatorApplicationWindowCount / gcd, gcd, slidingWindowCount / gcd);
+    OperatorMeta um = streamMeta.getSource()
+        .getSlidingUnifier(operatorApplicationWindowCount / gcd, gcd, slidingWindowCount / gcd);
     PTOperator pu = plan.newOperator(um, um.getName());
 
     Operator unifier = um.getOperator();
@@ -135,36 +139,36 @@ public class StreamMapping implements java.io.Serializable
     OperatorMeta sourceOM = streamMeta.getSource().getOperatorMeta();
     if (sourceOM.getAttributes().contains(Context.OperatorContext.SLIDE_BY_WINDOW_COUNT)) {
       if (sourceOM.getValue(Context.OperatorContext.SLIDE_BY_WINDOW_COUNT) <
-        sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)) {
+          sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)) {
         plan.undeployOpers.addAll(slidingUnifiers);
         slidingUnifiers.clear();
         List<PTOutput> newUpstream = Lists.newArrayList();
         PTOperator slidingUnifier;
         for (PTOutput source : upstream) {
           slidingUnifier = StreamMapping.createSlidingUnifier(streamMeta, plan,
-            sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT),
-            sourceOM.getValue(Context.OperatorContext.SLIDE_BY_WINDOW_COUNT));
+              sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT),
+              sourceOM.getValue(Context.OperatorContext.SLIDE_BY_WINDOW_COUNT));
           addInput(slidingUnifier, source, null);
           this.slidingUnifiers.add(slidingUnifier);
           newUpstream.add(slidingUnifier.outputs.get(0));
         }
         upstream.clear();
         upstream.addAll(newUpstream);
-      }
-      else {
+      } else {
         LOG.warn("Sliding Window Count {} should be less than APPLICATION WINDOW COUNT {}", sourceOM.getValue(Context.OperatorContext.SLIDE_BY_WINDOW_COUNT), sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT));
       }
     }
   }
 
   @SuppressWarnings("AssignmentToForLoopParameter")
-  private List<PTOutput> setupCascadingUnifiers(List<PTOutput> upstream, List<PTOperator> pooledUnifiers, int limit, int level) {
+  private List<PTOutput> setupCascadingUnifiers(List<PTOutput> upstream, List<PTOperator> pooledUnifiers, int limit, int level)
+  {
     List<PTOutput> nextLevel = Lists.newArrayList();
     PTOperator pu = null;
-    for (int i=0; i<upstream.size(); i++) {
+    for (int i = 0; i < upstream.size(); i++) {
       if (i % limit == 0) {
         if (upstream.size() - i < limit) {
-          while(i< upstream.size()) {
+          while (i < upstream.size()) {
             nextLevel.add(upstream.get(i));
             i++;
           }
@@ -195,7 +199,8 @@ public class StreamMapping implements java.io.Serializable
    * rebuild the tree, which may cause more changes to execution layer than need be
    * TODO: investigate incremental logic
    */
-  private void redoMapping() {
+  private void redoMapping()
+  {
 
     Set<Pair<PTOperator, InputPortMeta>> downstreamOpers = Sets.newHashSet();
 
@@ -206,7 +211,7 @@ public class StreamMapping implements java.io.Serializable
       if (!ipm.getValue(PortContext.PARTITION_PARALLEL) && plan.hasMapping(ipm.getOperatorWrapper())) {
         List<PTOperator> partitions = plan.getOperators(ipm.getOperatorWrapper());
         for (PTOperator doper : partitions) {
-          downstreamOpers.add(new Pair<PTOperator, InputPortMeta>(doper, ipm));
+          downstreamOpers.add(new Pair<>(doper, ipm));
         }
       }
     }
@@ -273,7 +278,7 @@ public class StreamMapping implements java.io.Serializable
         PartitionKeys pks = partKeys != null ? partKeys.get(doperEntry.second) : null;
         Boolean sinkSingleFinal = doperEntry.second.getAttributes().get(PortContext.UNIFIER_SINGLE_FINAL);
         boolean lastSingle = (sinkSingleFinal != null) ? sinkSingleFinal :
-                                (sourceSingleFinal != null ? sourceSingleFinal.booleanValue() : PortContext.UNIFIER_SINGLE_FINAL.defaultValue);
+            (sourceSingleFinal != null ? sourceSingleFinal.booleanValue() : PortContext.UNIFIER_SINGLE_FINAL.defaultValue);
 
         if (upstream.size() > 1) {
           if (!separateUnifiers && ((pks == null || pks.mask == 0) || lastSingle)) {
@@ -329,15 +334,16 @@ public class StreamMapping implements java.io.Serializable
       // 1) Upstream operator partitions are scaled down to one. (no unifier needed)
       // 2) Downstream operators partitions are scaled up from one to multiple. (replaced by merged unifier)
       if (finalUnifier != null && finalUnifier.inputs.isEmpty()) {
-          plan.removePTOperator(finalUnifier);
-          finalUnifier = null;
+        plan.removePTOperator(finalUnifier);
+        finalUnifier = null;
       }
 
     }
 
   }
 
-  private void setInput(PTOperator oper, InputPortMeta ipm, PTOperator sourceOper, PartitionKeys pks) {
+  private void setInput(PTOperator oper, InputPortMeta ipm, PTOperator sourceOper, PartitionKeys pks)
+  {
     // TODO: see if this can be handled more efficiently
     for (PTInput in : oper.inputs) {
       if (in.source.source == sourceOper && in.logicalStream == streamMeta && ipm.getPortName().equals(in.portName)) {
@@ -360,7 +366,8 @@ public class StreamMapping implements java.io.Serializable
     target.inputs.add(input);
   }
 
-  private void detachUnifier(PTOperator unifier) {
+  private void detachUnifier(PTOperator unifier)
+  {
     // remove existing unifiers from downstream inputs
     for (PTOutput out : unifier.outputs) {
       for (PTInput input : out.sinks) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/security/KerberosAuth.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/security/KerberosAuth.java b/engine/src/main/java/com/datatorrent/stram/security/KerberosAuth.java
index 15137e5..20cde63 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/KerberosAuth.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/KerberosAuth.java
@@ -20,8 +20,13 @@ package com.datatorrent.stram.security;
 
 import java.io.IOException;
 import java.util.HashMap;
+
 import javax.security.auth.Subject;
-import javax.security.auth.callback.*;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.auth.login.AppConfigurationEntry;
 import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
 import javax.security.auth.login.Configuration;
@@ -36,21 +41,23 @@ import javax.security.auth.login.LoginException;
 public class KerberosAuth
 {
 
-  public static Subject loginUser(String principal, char[] password) throws LoginException, IOException {
+  public static Subject loginUser(String principal, char[] password) throws LoginException, IOException
+  {
     Subject subject = new Subject();
-    LoginContext lc = new LoginContext(com.datatorrent.stram.security.KerberosAuth.class.getName(), subject, new AuthenticationHandler(principal, password),
-                                                                            new KerberosConfiguration(principal));
+    LoginContext lc = new LoginContext(com.datatorrent.stram.security.KerberosAuth.class.getName(), subject, new AuthenticationHandler(principal, password), new KerberosConfiguration(principal));
     lc.login();
     return subject;
     //return UserGroupInformation.getUGIFromTicketCache(ticketCache, principal);
   }
 
-  private static class AuthenticationHandler implements CallbackHandler {
+  private static class AuthenticationHandler implements CallbackHandler
+  {
 
     private final String principal;
     private final char[] password;
 
-    AuthenticationHandler(String principal, char[] password) {
+    AuthenticationHandler(String principal, char[] password)
+    {
       this.principal = principal;
       this.password = password;
     }
@@ -71,22 +78,26 @@ public class KerberosAuth
 
   }
 
-  private static class KerberosConfiguration extends Configuration {
+  private static class KerberosConfiguration extends Configuration
+  {
 
     private final String principal;
 
-    KerberosConfiguration(String principal) {
+    KerberosConfiguration(String principal)
+    {
       this.principal = principal;
     }
 
     @Override
-    public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+    public AppConfigurationEntry[] getAppConfigurationEntry(String name)
+    {
       if (name.equals(com.datatorrent.stram.security.KerberosAuth.class.getName())) {
         AppConfigurationEntry[] configEntries = new AppConfigurationEntry[1];
-        HashMap<String,String> params = new HashMap<String, String>();
+        HashMap<String, String> params = new HashMap<>();
         params.put("useTicketCache", "true");
         params.put("principal", principal);
-        configEntries[0] = new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule", LoginModuleControlFlag.REQUIRED, params );
+        configEntries[0] = new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule",
+            LoginModuleControlFlag.REQUIRED, params);
         return configEntries;
       } else {
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/security/StramDelegationTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramDelegationTokenIdentifier.java b/engine/src/main/java/com/datatorrent/stram/security/StramDelegationTokenIdentifier.java
index 9e1477f..d4353f9 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/StramDelegationTokenIdentifier.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/StramDelegationTokenIdentifier.java
@@ -30,10 +30,12 @@ public class StramDelegationTokenIdentifier extends AbstractDelegationTokenIdent
 {
   public static final Text IDENTIFIER_KIND = new Text("DT_DELEGATION_TOKEN");
 
-  public StramDelegationTokenIdentifier() {
+  public StramDelegationTokenIdentifier()
+  {
   }
 
-  public StramDelegationTokenIdentifier(Text owner, Text renewer, Text realUser) {
+  public StramDelegationTokenIdentifier(Text owner, Text renewer, Text realUser)
+  {
     super(owner, renewer, realUser);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/security/StramDelegationTokenManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramDelegationTokenManager.java b/engine/src/main/java/com/datatorrent/stram/security/StramDelegationTokenManager.java
index 681dbc0..1b754e2 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/StramDelegationTokenManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/StramDelegationTokenManager.java
@@ -28,9 +28,11 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecret
 public class StramDelegationTokenManager extends AbstractDelegationTokenSecretManager<StramDelegationTokenIdentifier>
 {
 
-  public StramDelegationTokenManager(long delegationKeyUpdateInterval, long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
-                                     long delegationTokenRemoverScanInterval) {
-    super(delegationKeyUpdateInterval,delegationTokenMaxLifetime,delegationTokenRenewInterval,delegationTokenRemoverScanInterval);
+  public StramDelegationTokenManager(long delegationKeyUpdateInterval, long delegationTokenMaxLifetime,
+      long delegationTokenRenewInterval, long delegationTokenRemoverScanInterval)
+  {
+    super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval,
+        delegationTokenRemoverScanInterval);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/security/StramDelegationTokenSelector.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramDelegationTokenSelector.java b/engine/src/main/java/com/datatorrent/stram/security/StramDelegationTokenSelector.java
index 3b8becf..ff0b2fb 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/StramDelegationTokenSelector.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/StramDelegationTokenSelector.java
@@ -19,12 +19,14 @@
 package com.datatorrent.stram.security;
 
 import java.util.Collection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.TokenSelector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * <p>StramDelegationTokenSelector class.</p>
@@ -34,16 +36,15 @@ import org.slf4j.LoggerFactory;
 public class StramDelegationTokenSelector implements TokenSelector<StramDelegationTokenIdentifier>
 {
 
-  private static final Logger LOG= LoggerFactory.getLogger(StramDelegationTokenSelector.class);
+  private static final Logger LOG = LoggerFactory.getLogger(StramDelegationTokenSelector.class);
 
   @Override
   public Token<StramDelegationTokenIdentifier> selectToken(Text text, Collection<Token<? extends TokenIdentifier>> clctn)
   {
     Token<StramDelegationTokenIdentifier> token = null;
-    if (text  != null) {
+    if (text != null) {
       for (Token<? extends TokenIdentifier> ctoken : clctn) {
-        if (StramDelegationTokenIdentifier.IDENTIFIER_KIND.equals(ctoken.getKind()) && text.equals(ctoken.getService()))
-        {
+        if (StramDelegationTokenIdentifier.IDENTIFIER_KIND.equals(ctoken.getKind()) && text.equals(ctoken.getService())) {
           token = (Token<StramDelegationTokenIdentifier>)ctoken;
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
index 7ebe42f..b2fa4e7 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
@@ -24,6 +24,9 @@ import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
 import java.util.Iterator;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.security.Credentials;
@@ -31,8 +34,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.datatorrent.api.StreamingApplication;
 
@@ -65,14 +66,13 @@ public class StramUserLogin
   public static void authenticate(String principal, String keytab) throws IOException
   {
     if ((principal != null) && !principal.isEmpty()
-      && (keytab != null) && !keytab.isEmpty()) {
+        && (keytab != null) && !keytab.isEmpty()) {
       try {
         UserGroupInformation.loginUserFromKeytab(principal, keytab);
         LOG.info("Login user {}", UserGroupInformation.getCurrentUser().getUserName());
         StramUserLogin.principal = principal;
         StramUserLogin.keytab = keytab;
-      }
-      catch (IOException ie) {
+      } catch (IOException ie) {
         LOG.error("Error login user with principal {}", principal, ie);
         throw ie;
       }
@@ -85,17 +85,14 @@ public class StramUserLogin
     //renew tokens
     final String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
     if (tokenRenewer == null || tokenRenewer.length() == 0) {
-      throw new IOException(
-        "Can't get Master Kerberos principal for the RM to use as renewer");
+      throw new IOException("Can't get Master Kerberos principal for the RM to use as renewer");
     }
-    FileSystem fs = FileSystem.newInstance(conf);
+
     File keyTabFile;
-    try {
+    try (FileSystem fs = FileSystem.newInstance(conf)) {
       keyTabFile = FSUtil.copyToLocalFileSystem(fs, destinationDir, destinationFile, hdfsKeyTabFile, conf);
     }
-    finally {
-      fs.close();
-    }
+
     UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(UserGroupInformation.getCurrentUser().getUserName(), keyTabFile.getAbsolutePath());
     try {
       ugi.doAs(new PrivilegedExceptionAction<Object>()
@@ -103,7 +100,7 @@ public class StramUserLogin
         @Override
         public Object run() throws Exception
         {
-          FileSystem fs1 = FileSystem.newInstance(conf);
+
           YarnClient yarnClient = null;
           if (renewRMToken) {
             yarnClient = YarnClient.createYarnClient();
@@ -111,14 +108,12 @@ public class StramUserLogin
             yarnClient.start();
           }
           Credentials creds = new Credentials();
-          try {
+          try (FileSystem fs1 = FileSystem.newInstance(conf)) {
             fs1.addDelegationTokens(tokenRenewer, creds);
             if (renewRMToken) {
               new StramClientUtils.ClientRMHelper(yarnClient, conf).addRMDelegationToken(tokenRenewer, creds);
             }
-          }
-          finally {
-            fs1.close();
+          } finally {
             if (renewRMToken) {
               yarnClient.stop();
             }
@@ -129,12 +124,10 @@ public class StramUserLogin
         }
       });
       UserGroupInformation.getCurrentUser().addCredentials(credentials);
-    }
-    catch (InterruptedException e) {
+    } catch (InterruptedException e) {
       LOG.error("Error while renewing tokens ", e);
       expiryTime = System.currentTimeMillis();
-    }
-    catch (IOException e) {
+    } catch (IOException e) {
       LOG.error("Error while renewing tokens ", e);
       expiryTime = System.currentTimeMillis();
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/security/StramWSFilter.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramWSFilter.java b/engine/src/main/java/com/datatorrent/stram/security/StramWSFilter.java
index 061bdc7..375462d 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/StramWSFilter.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/StramWSFilter.java
@@ -27,7 +27,12 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import javax.servlet.*;
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
 import javax.servlet.http.Cookie;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -98,9 +103,9 @@ public class StramWSFilter implements Filter
   protected Set<String> getProxyAddresses() throws ServletException
   {
     long now = System.currentTimeMillis();
-    synchronized(this) {
-      if(proxyAddresses == null || (lastUpdate + updateInterval) >= now) {
-        proxyAddresses = new HashSet<String>();
+    synchronized (this) {
+      if (proxyAddresses == null || (lastUpdate + updateInterval) >= now) {
+        proxyAddresses = new HashSet<>();
         for (String proxyHost : proxyHosts) {
           try {
             logger.debug("resolving proxy hostname {}", proxyHost);
@@ -126,10 +131,9 @@ public class StramWSFilter implements Filter
   }
 
   @Override
-  public void doFilter(ServletRequest req, ServletResponse resp,
-                       FilterChain chain) throws IOException, ServletException
+  public void doFilter(ServletRequest req, ServletResponse resp, FilterChain chain) throws IOException, ServletException
   {
-    if(!(req instanceof HttpServletRequest)) {
+    if (!(req instanceof HttpServletRequest)) {
       throw new ServletException("This filter only works for HTTP/HTTPS");
     }
 
@@ -139,10 +143,10 @@ public class StramWSFilter implements Filter
     String requestURI = httpReq.getRequestURI();
     boolean authenticate = true;
     String user = null;
-    if(getProxyAddresses().contains(httpReq.getRemoteAddr())) {
+    if (getProxyAddresses().contains(httpReq.getRemoteAddr())) {
       if (httpReq.getCookies() != null) {
-        for(Cookie c: httpReq.getCookies()) {
-          if(WEBAPP_PROXY_USER.equals(c.getName())){
+        for (Cookie c : httpReq.getCookies()) {
+          if (WEBAPP_PROXY_USER.equals(c.getName())) {
             user = c.getValue();
             break;
           }
@@ -186,7 +190,7 @@ public class StramWSFilter implements Filter
       }
     }
 
-    if(user == null) {
+    if (user == null) {
       logger.debug("{}: could not find user, so user principal will not be set", remoteAddr);
       chain.doFilter(req, resp);
     } else {
@@ -202,14 +206,14 @@ public class StramWSFilter implements Filter
     //tokenIdentifier.setSequenceNumber(sequenceNumber.getAndAdd(1));
     //byte[] password = tokenManager.addIdentifier(tokenIdentifier);
     //Token<StramDelegationTokenIdentifier> token = new Token<StramDelegationTokenIdentifier>(tokenIdentifier.getBytes(), password, tokenIdentifier.getKind(), new Text(service));
-    Token<StramDelegationTokenIdentifier> token = new Token<StramDelegationTokenIdentifier>(tokenIdentifier, tokenManager);
+    Token<StramDelegationTokenIdentifier> token = new Token<>(tokenIdentifier, tokenManager);
     token.setService(new Text(service));
     return token.encodeToUrlString();
   }
 
   private String verifyClientToken(String tokenstr, String cid) throws IOException
   {
-    Token<StramDelegationTokenIdentifier> token = new Token<StramDelegationTokenIdentifier>();
+    Token<StramDelegationTokenIdentifier> token = new Token<>();
     try {
       token.decodeFromUrlString(tokenstr);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java b/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java
index b090d09..61e06dd 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java
@@ -52,8 +52,8 @@ public class StramWSFilterInitializer extends FilterInitializer
   public void initFilter(FilterContainer container, Configuration conf)
   {
     logger.debug("Conf {}", conf);
-    Map<String, String> params = new HashMap<String, String>();
-    Collection<String> proxies = new ArrayList<String>();
+    Map<String, String> params = new HashMap<>();
+    Collection<String> proxies = new ArrayList<>();
     if (ConfigUtils.isRMHAEnabled(conf)) {
       // HA is enabled get all
       for (String rmId : ConfigUtils.getRMHAIds(conf)) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/security/StramWSPrincipal.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramWSPrincipal.java b/engine/src/main/java/com/datatorrent/stram/security/StramWSPrincipal.java
index 4dded8e..c747e2f 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/StramWSPrincipal.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/StramWSPrincipal.java
@@ -26,15 +26,18 @@ import java.security.Principal;
  *
  * @since 0.9.2
  */
-public class StramWSPrincipal implements Principal {
+public class StramWSPrincipal implements Principal
+{
   private final String name;
 
-  public StramWSPrincipal(String name) {
+  public StramWSPrincipal(String name)
+  {
     this.name = name;
   }
 
   @Override
-  public String getName() {
+  public String getName()
+  {
     return name;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/security/StramWSServletRequestWrapper.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramWSServletRequestWrapper.java b/engine/src/main/java/com/datatorrent/stram/security/StramWSServletRequestWrapper.java
index a6384af..8e0236e 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/StramWSServletRequestWrapper.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/StramWSServletRequestWrapper.java
@@ -18,35 +18,41 @@
  */
 package com.datatorrent.stram.security;
 
+import java.security.Principal;
+
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletRequestWrapper;
-import java.security.Principal;
 
 /**
  * Borrowed from
  *
  * @since 0.9.2
  */
-public class StramWSServletRequestWrapper extends HttpServletRequestWrapper {
+public class StramWSServletRequestWrapper extends HttpServletRequestWrapper
+{
   private final StramWSPrincipal principal;
 
-  public StramWSServletRequestWrapper(HttpServletRequest request, StramWSPrincipal principal) {
+  public StramWSServletRequestWrapper(HttpServletRequest request, StramWSPrincipal principal)
+  {
     super(request);
     this.principal = principal;
   }
 
   @Override
-  public Principal getUserPrincipal() {
+  public Principal getUserPrincipal()
+  {
     return principal;
   }
 
   @Override
-  public String getRemoteUser() {
+  public String getRemoteUser()
+  {
     return principal.getName();
   }
 
   @Override
-  public boolean isUserInRole(String role) {
+  public boolean isUserInRole(String role)
+  {
     //No role info so far
     return false;
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
index c11ba2b..7db4892 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
@@ -25,9 +25,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.datatorrent.api.StreamCodec;
-
 import com.datatorrent.bufferserver.client.Publisher;
-import com.datatorrent.bufferserver.packet.*;
+import com.datatorrent.bufferserver.packet.BeginWindowTuple;
+import com.datatorrent.bufferserver.packet.DataTuple;
+import com.datatorrent.bufferserver.packet.EndStreamTuple;
+import com.datatorrent.bufferserver.packet.EndWindowTuple;
+import com.datatorrent.bufferserver.packet.MessageType;
+import com.datatorrent.bufferserver.packet.PayloadTuple;
+import com.datatorrent.bufferserver.packet.ResetWindowTuple;
+import com.datatorrent.bufferserver.packet.WindowIdTuple;
 import com.datatorrent.bufferserver.util.Codec;
 import com.datatorrent.netlet.EventLoop;
 import com.datatorrent.stram.codec.StatefulStreamCodec;
@@ -104,12 +110,10 @@ public class BufferServerPublisher extends Publisher implements ByteCounterStrea
         default:
           throw new UnsupportedOperationException("this data type is not handled in the stream");
       }
-    }
-    else {
+    } else {
       if (statefulSerde == null) {
         array = PayloadTuple.getSerializedTuple(serde.getPartition(payload), serde.toByteArray(payload));
-      }
-      else {
+      } else {
         DataStatePair dsp = statefulSerde.toDataStatePair(payload);
         /*
          * if there is any state write that for the subscriber before we write the data.
@@ -136,8 +140,7 @@ public class BufferServerPublisher extends Publisher implements ByteCounterStrea
         sleep(5);
       }
       publishedByteCount.addAndGet(array.length);
-    }
-    catch (InterruptedException ie) {
+    } catch (InterruptedException ie) {
       throw new RuntimeException(ie);
     }
   }
@@ -178,12 +181,10 @@ public class BufferServerPublisher extends Publisher implements ByteCounterStrea
   {
     StreamCodec<?> codec = context.get(StreamContext.CODEC);
     if (codec == null) {
-      statefulSerde = ((StatefulStreamCodec < Object >)StreamContext.CODEC.defaultValue).newInstance();
-    }
-    else if (codec instanceof StatefulStreamCodec) {
+      statefulSerde = ((StatefulStreamCodec<Object>)StreamContext.CODEC.defaultValue).newInstance();
+    } else if (codec instanceof StatefulStreamCodec) {
       statefulSerde = ((StatefulStreamCodec<Object>)codec).newInstance();
-    }
-    else {
+    } else {
       serde = (StreamCodec<Object>)codec;
     }
   }
@@ -208,8 +209,7 @@ public class BufferServerPublisher extends Publisher implements ByteCounterStrea
   {
     try {
       return count;
-    }
-    finally {
+    } finally {
       if (reset) {
         count = 0;
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
index 9441e76..d5b0997 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
@@ -29,12 +29,11 @@ import org.slf4j.LoggerFactory;
 
 import com.datatorrent.api.Sink;
 import com.datatorrent.api.StreamCodec;
-
 import com.datatorrent.bufferserver.client.Subscriber;
 import com.datatorrent.bufferserver.util.Codec;
-import com.datatorrent.netlet.util.Slice;
 import com.datatorrent.netlet.EventLoop;
 import com.datatorrent.netlet.util.CircularBuffer;
+import com.datatorrent.netlet.util.Slice;
 import com.datatorrent.stram.codec.StatefulStreamCodec;
 import com.datatorrent.stram.codec.StatefulStreamCodec.DataStatePair;
 import com.datatorrent.stram.engine.ByteCounterStream;
@@ -42,7 +41,11 @@ import com.datatorrent.stram.engine.StreamContext;
 import com.datatorrent.stram.engine.SweepableReservoir;
 import com.datatorrent.stram.engine.WindowGenerator;
 import com.datatorrent.stram.plan.logical.StreamCodecWrapperForPersistance;
-import com.datatorrent.stram.tuple.*;
+import com.datatorrent.stram.tuple.CheckpointTuple;
+import com.datatorrent.stram.tuple.EndStreamTuple;
+import com.datatorrent.stram.tuple.EndWindowTuple;
+import com.datatorrent.stram.tuple.ResetWindowTuple;
+import com.datatorrent.stram.tuple.Tuple;
 
 /**
  * Implement tuple flow from buffer server to the node in a logical stream<p>
@@ -71,12 +74,12 @@ public class BufferServerSubscriber extends Subscriber implements ByteCounterStr
   {
     super(id);
     this.reservoirs = new BufferReservoir[0];
-    this.reservoirMap = new HashMap<String, BufferReservoir>();
+    this.reservoirMap = new HashMap<>();
     this.readByteCount = new AtomicLong(0);
     this.dsp = new DataStatePair();
-    polledFragments = offeredFragments = new CircularBuffer<Slice>(queueCapacity);
-    freeFragments = new CircularBuffer<Slice>(queueCapacity);
-    backlog = new ArrayDeque<CircularBuffer<Slice>>();
+    polledFragments = offeredFragments = new CircularBuffer<>(queueCapacity);
+    freeFragments = new CircularBuffer<>(queueCapacity);
+    backlog = new ArrayDeque<>();
   }
 
   @Override
@@ -104,8 +107,7 @@ public class BufferServerSubscriber extends Subscriber implements ByteCounterStr
     Slice f;
     if (freeFragments.isEmpty()) {
       f = new Slice(buffer, offset, length);
-    }
-    else {
+    } else {
       f = freeFragments.pollUnsafe();
       f.buffer = buffer;
       f.offset = offset;
@@ -119,7 +121,7 @@ public class BufferServerSubscriber extends Subscriber implements ByteCounterStr
           suspended = true;
         }
         int newsize = offeredFragments.capacity() == MAX_SENDBUFFER_SIZE ? offeredFragments.capacity() : offeredFragments.capacity() << 1;
-        backlog.add(offeredFragments = new CircularBuffer<Slice>(newsize));
+        backlog.add(offeredFragments = new CircularBuffer<>(newsize));
         offeredFragments.add(f);
       }
     }
@@ -131,12 +133,10 @@ public class BufferServerSubscriber extends Subscriber implements ByteCounterStr
   {
     StreamCodec<?> codec = context.get(StreamContext.CODEC);
     if (codec == null) {
-      statefulSerde = ((StatefulStreamCodec <Object>)StreamContext.CODEC.defaultValue).newInstance();
-    }
-    else if (codec instanceof StatefulStreamCodec) {
+      statefulSerde = ((StatefulStreamCodec<Object>)StreamContext.CODEC.defaultValue).newInstance();
+    } else if (codec instanceof StatefulStreamCodec) {
       statefulSerde = ((StatefulStreamCodec<Object>)codec).newInstance();
-    }
-    else {
+    } else {
       serde = (StreamCodec<Object>)codec;
     }
     baseSeconds = context.getFinishedWindowId() & 0xffffffff00000000L;
@@ -262,8 +262,7 @@ public class BufferServerSubscriber extends Subscriber implements ByteCounterStr
     {
       try {
         return this.sink;
-      }
-      finally {
+      } finally {
         this.sink = sink;
       }
     }
@@ -387,8 +386,7 @@ public class BufferServerSubscriber extends Subscriber implements ByteCounterStr
     {
       try {
         return count;
-      }
-      finally {
+      } finally {
         if (reset) {
           count = 0;
         }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java b/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
index 1ae2a16..88f2052 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
@@ -25,14 +25,20 @@ import java.nio.ByteOrder;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.KryoException;
 import com.esotericsoftware.kryo.io.Output;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.bufferserver.packet.*;
+import com.datatorrent.bufferserver.packet.BeginWindowTuple;
+import com.datatorrent.bufferserver.packet.EndStreamTuple;
+import com.datatorrent.bufferserver.packet.EndWindowTuple;
+import com.datatorrent.bufferserver.packet.MessageType;
+import com.datatorrent.bufferserver.packet.PublishRequestTuple;
+import com.datatorrent.bufferserver.packet.ResetWindowTuple;
+import com.datatorrent.bufferserver.packet.WindowIdTuple;
 import com.datatorrent.netlet.EventLoop;
 import com.datatorrent.netlet.Listener;
 import com.datatorrent.netlet.Listener.ClientListener;
@@ -99,17 +105,14 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
     int read;
     if ((read = channel.read(ByteBuffer.allocate(1))) > 0) {
       throw new RuntimeException("Publisher " + this + " is not supposed to receive any data");
-    }
-    else if (read == -1) {
+    } else if (read == -1) {
       try {
         channel.close();
-      }
-      finally {
+      } finally {
         unregistered(key);
         key.attach(Listener.NOOP_CLIENT_LISTENER);
       }
-    }
-    else {
+    } else {
       logger.debug("{} read 0 bytes", this);
     }
   }
@@ -136,14 +139,12 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
         readBuffer.limit(0);
         if (readIndex == lastIndex) {
           readIndex = 0;
-        }
-        else {
+        } else {
           readIndex++;
         }
       }
       readBuffer = readBuffers[readIndex];
-    }
-    while (true);
+    } while (true);
   }
 
   @Override
@@ -245,16 +246,14 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
         writeBuffer.put((byte)size);
         if (writeBuffer.hasRemaining()) {
           writeBuffer.put((byte)(size >> 8));
-        }
-        else {
+        } else {
           synchronized (readBuffers) {
             readBuffers[writeIndex].limit(BUFFER_CAPACITY);
           }
           advanceWriteBuffer();
           writeBuffer.put((byte)(size >> 8));
         }
-      }
-      else {
+      } else {
         synchronized (readBuffers) {
           readBuffers[writeIndex].limit(BUFFER_CAPACITY);
         }
@@ -279,17 +278,14 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
             writeBuffer.put(array, offset, size);
             break;
           }
-        }
-        while (true);
-      }
-      else {
+        } while (true);
+      } else {
         writeBuffer.put(array);
       }
       synchronized (readBuffers) {
         readBuffers[writeIndex].limit(writeBuffer.position());
       }
-    }
-    else {
+    } else {
       count++;
       int hashcode = tuple.hashCode();
 
@@ -301,8 +297,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
         writeBuffer.position(BUFFER_CAPACITY);
         advanceWriteBuffer();
         writeBuffer.position(newPosition - BUFFER_CAPACITY);
-      }
-      else {
+      } else {
         writeBuffer.position(newPosition);
       }
 
@@ -321,8 +316,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
         synchronized (readBuffers[wi]) {
           readBuffers[wi].limit(writeBuffer.position());
         }
-      }
-      else {
+      } else {
         size = BUFFER_CAPACITY - position - 2 + writeBuffer.position();
         int index = writeIndex;
         synchronized (readBuffers[index]) {
@@ -332,8 +326,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
         do {
           if (index == 0) {
             index = lastIndex;
-          }
-          else {
+          } else {
             index--;
           }
 
@@ -345,8 +338,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
             readBuffers[index].limit(BUFFER_CAPACITY);
           }
           size += BUFFER_CAPACITY;
-        }
-        while (true);
+        } while (true);
         assert (size <= Short.MAX_VALUE);
         index = wi;
         switch (position) {
@@ -354,8 +346,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
             position = 0;
             if (wi == lastIndex) {
               wi = 0;
-            }
-            else {
+            } else {
               wi++;
             }
             writeBuffers[wi].put(position++, (byte)size);
@@ -371,8 +362,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
             writeBuffers[wi].put(position, (byte)size);
             if (wi == lastIndex) {
               wi = 0;
-            }
-            else {
+            } else {
               wi++;
             }
             position = 0;
@@ -389,8 +379,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
             writeBuffers[wi].put(position, (byte)(size >> 8));
             if (wi == lastIndex) {
               wi = 0;
-            }
-            else {
+            } else {
               wi++;
             }
             position = 0;
@@ -407,8 +396,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
             writeBuffers[wi].put(position, com.datatorrent.bufferserver.packet.MessageType.PAYLOAD_VALUE);
             if (wi == lastIndex) {
               wi = 0;
-            }
-            else {
+            } else {
               wi++;
             }
             position = 0;
@@ -425,8 +413,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
             writeBuffers[wi].put(position, (byte)hashcode);
             if (wi == lastIndex) {
               wi = 0;
-            }
-            else {
+            } else {
               wi++;
             }
             position = 0;
@@ -443,8 +430,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
             writeBuffers[wi].put(position, (byte)(hashcode >> 8));
             if (wi == lastIndex) {
               wi = 0;
-            }
-            else {
+            } else {
               wi++;
             }
             position = 0;
@@ -461,8 +447,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
             writeBuffers[wi].put(position, (byte)(hashcode >> 16));
             if (wi == lastIndex) {
               wi = 0;
-            }
-            else {
+            } else {
               wi++;
             }
             position = 0;
@@ -497,8 +482,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
   {
     if (writeIndex == lastIndex) {
       writeIndex = 0;
-    }
-    else {
+    } else {
       writeIndex++;
     }
 
@@ -509,8 +493,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
 
       writeBuffer = writeBuffers[writeIndex];
       writeBuffer.clear();
-    }
-    catch (InterruptedException ie) {
+    } catch (InterruptedException ie) {
       throw new RuntimeException(ie);
     }
   }
@@ -521,8 +504,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
     if (reset) {
       try {
         return count;
-      }
-      finally {
+      } finally {
         count = 0;
       }
     }
@@ -549,8 +531,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
         writeBuffer.put(bytes, 0, remaining);
         advanceWriteBuffer();
         write(bytes, remaining, bytes.length - remaining);
-      }
-      else {
+      } else {
         writeBuffer.put(bytes);
       }
     }
@@ -843,17 +824,14 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
           }
           if (charIndex < charCount) {
             advanceWriteBuffer();
-          }
-          else {
+          } else {
             break;
           }
-        }
-        while (true);
+        } while (true);
 
         int pos = writeBuffer.position() - 1;
         writeBuffer.put(pos, (byte)(writeBuffer.get(pos) | 0x80));
-      }
-      else {
+      } else {
         writeUtf8Length(charCount + 1);
         do {
           int c;
@@ -868,12 +846,10 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
 
           if (charIndex < charCount) {
             advanceWriteBuffer();
-          }
-          else {
+          } else {
             break;
           }
-        }
-        while (true);
+        } while (true);
       }
     }
 
@@ -904,12 +880,10 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
 
         if (charIndex < charCount) {
           advanceWriteBuffer();
-        }
-        else {
+        } else {
           break;
         }
-      }
-      while (true);
+      } while (true);
     }
 
     @Override
@@ -931,12 +905,10 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
         }
         if (charIndex < charCount) {
           advanceWriteBuffer();
-        }
-        else {
+        } else {
           break;
         }
-      }
-      while (true);
+      } while (true);
 
       int pos = writeBuffer.position() - 1;
       writeBuffer.put(pos, (byte)(writeBuffer.get(pos) | 0x80));
@@ -949,13 +921,11 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
         writeBuffer.put((byte)(value >>> 8));
         if (writeBuffer.hasRemaining()) {
           writeBuffer.put((byte)value);
-        }
-        else {
+        } else {
           advanceWriteBuffer();
           writeBuffer.put((byte)value);
         }
-      }
-      else {
+      } else {
         advanceWriteBuffer();
         writeBuffer.put((byte)(value >>> 8));
         writeBuffer.put((byte)value);
@@ -1619,8 +1589,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
     {
       if (writeBuffer.hasRemaining()) {
         writeBuffer.put((byte)(value ? 1 : 0));
-      }
-      else {
+      } else {
         advanceWriteBuffer();
         writeBuffer.put((byte)(value ? 1 : 0));
       }
@@ -1663,8 +1632,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
             writeBuffer.put((byte)(value | 0x80));
             break;
         }
-      }
-      else if (value >>> 13 == 0) {
+      } else if (value >>> 13 == 0) {
         switch (remaining) {
           case 0:
             advanceWriteBuffer();
@@ -1683,8 +1651,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
             writeBuffer.put((byte)(value >>> 6));
             break;
         }
-      }
-      else if (value >>> 20 == 0) {
+      } else if (value >>> 20 == 0) {
         switch (remaining) {
           case 0:
             advanceWriteBuffer();
@@ -1710,8 +1677,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
             writeBuffer.put((byte)(value >>> 13));
             break;
         }
-      }
-      else if (value >>> 27 == 0) {
+      } else if (value >>> 27 == 0) {
         switch (remaining) {
           case 0:
             advanceWriteBuffer();
@@ -1748,8 +1714,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
             writeBuffer.put((byte)(value >>> 20));
             break;
         }
-      }
-      else {
+      } else {
         switch (remaining) {
           case 0:
             advanceWriteBuffer();
@@ -1821,8 +1786,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
               remaining--;
               break;
           }
-        }
-        else if (c > 0x07FF) {
+        } else if (c > 0x07FF) {
           switch (remaining) {
             case 0:
               advanceWriteBuffer();
@@ -1855,8 +1819,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
               remaining -= 3;
               break;
           }
-        }
-        else {
+        } else {
           switch (remaining) {
             case 0:
               advanceWriteBuffer();

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java b/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java
index b49a5b0..e909b80 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java
@@ -81,8 +81,7 @@ public class InlineStream extends ForwardingReservoir implements Stream, Sweepab
   {
     try {
       super.put(tuple);
-    }
-    catch (InterruptedException ie) {
+    } catch (InterruptedException ie) {
       logger.debug("Interrupted", ie);
       throw new RuntimeException(ie);
     }



Mime
View raw message