apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [14/16] incubator-apex-core git commit: APEXCORE-423 style fix for apex-core engine
Date Sat, 09 Apr 2016 03:52:32 GMT
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java b/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java
index 22bebfc..bccbedf 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java
@@ -19,7 +19,11 @@
 package com.datatorrent.stram.api;
 
 import java.io.Serializable;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
@@ -143,7 +147,7 @@ public class OperatorDeployInfo implements Serializable, OperatorContext
     /*
      public StreamCodec streamCodec;
      */
-    public Map<Integer, StreamCodec<?>> streamCodecs = new HashMap<Integer,
StreamCodec<?>>();
+    public Map<Integer, StreamCodec<?>> streamCodecs = new HashMap<>();
     /**
      * Partition keys for the input stream. Null w/o partitioning.
      */
@@ -158,14 +162,14 @@ public class OperatorDeployInfo implements Serializable, OperatorContext
     public String toString()
     {
       return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
-              .append("portName", this.portName)
-              .append("streamId", this.declaredStreamId)
-              .append("sourceNodeId", this.sourceNodeId)
-              .append("sourcePortName", this.sourcePortName)
-              .append("locality", this.locality)
-              .append("partitionMask", this.partitionMask)
-              .append("partitionKeys", this.partitionKeys)
-              .toString();
+          .append("portName", this.portName)
+          .append("streamId", this.declaredStreamId)
+          .append("sourceNodeId", this.sourceNodeId)
+          .append("sourcePortName", this.sourcePortName)
+          .append("locality", this.locality)
+          .append("partitionMask", this.partitionMask)
+          .append("partitionKeys", this.partitionKeys)
+          .toString();
     }
 
     @Override
@@ -223,7 +227,7 @@ public class OperatorDeployInfo implements Serializable, OperatorContext
     public String bufferServerHost;
     public int bufferServerPort;
     public byte[] bufferServerToken;
-    public Map<Integer, StreamCodec<?>> streamCodecs = new HashMap<Integer,
StreamCodec<?>>();
+    public Map<Integer, StreamCodec<?>> streamCodecs = new HashMap<>();
     /**
      * Context attributes for output port
      */
@@ -310,13 +314,13 @@ public class OperatorDeployInfo implements Serializable, OperatorContext
   @Override
   public String toString()
   {
-    return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", this.id).
-            append("name", this.name).
-            append("type", this.type).
-            append("checkpoint", this.checkpoint).
-            append("inputs", this.inputs).
-            append("outputs", this.outputs).
-            toString();
+    return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", this.id)
+        .append("name", this.name)
+        .append("type", this.type)
+        .append("checkpoint", this.checkpoint)
+        .append("inputs", this.inputs)
+        .append("outputs", this.outputs)
+        .toString();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/api/RequestFactory.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/RequestFactory.java b/engine/src/main/java/com/datatorrent/stram/api/RequestFactory.java
index 89f673c..55b0732 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/RequestFactory.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/RequestFactory.java
@@ -39,13 +39,12 @@ public class RequestFactory
 
   public RequestFactory()
   {
-    this.map = new EnumMap<StramToNodeRequest.RequestType, RequestDelegate>(StramToNodeRequest.RequestType.class);
+    this.map = new EnumMap<>(StramToNodeRequest.RequestType.class);
   }
 
   public interface RequestDelegate
   {
-    public OperatorRequest getRequestExecutor(final Node<?> node, final StramToNodeRequest
snr);
-
+    OperatorRequest getRequestExecutor(final Node<?> node, final StramToNodeRequest
snr);
   }
 
   public void registerDelegate(StramToNodeRequest.RequestType requestType, RequestDelegate
delegate)

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java b/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
index 8d759db..c64ec60 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
@@ -18,9 +18,10 @@
  */
 package com.datatorrent.stram.api;
 
-import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
+
 /**
  * <p>
  * Abstract StramEvent class.</p>

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/api/StramToNodeGetPropertyRequest.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/StramToNodeGetPropertyRequest.java
b/engine/src/main/java/com/datatorrent/stram/api/StramToNodeGetPropertyRequest.java
index 3df6167..651c1fa 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/StramToNodeGetPropertyRequest.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/StramToNodeGetPropertyRequest.java
@@ -24,13 +24,13 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
-import org.apache.commons.beanutils.BeanMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.beanutils.BeanMap;
+
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.StatsListener;
-
 import com.datatorrent.stram.engine.OperatorResponse;
 
 /**
@@ -66,16 +66,15 @@ public class StramToNodeGetPropertyRequest extends StreamingContainerUmbilicalPr
     public StatsListener.OperatorResponse execute(Operator operator, int operatorId, long
windowId) throws IOException
     {
       BeanMap beanMap = new BeanMap(operator);
-      Map<String, Object> propertyValue = new HashMap<String, Object>();
+      Map<String, Object> propertyValue = new HashMap<>();
       if (propertyName != null) {
         if (beanMap.containsKey(propertyName)) {
           propertyValue.put(propertyName, beanMap.get(propertyName));
         }
-      }
-      else {
+      } else {
         Iterator entryIterator = beanMap.entryIterator();
         while (entryIterator.hasNext()) {
-          Map.Entry<String, Object> entry = (Map.Entry<String, Object>) entryIterator.next();
+          Map.Entry<String, Object> entry = (Map.Entry<String, Object>)entryIterator.next();
           propertyValue.put(entry.getKey(), entry.getValue());
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
b/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
index 50ebf22..d01d2b6 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
@@ -34,7 +34,6 @@ import com.datatorrent.api.Context;
 import com.datatorrent.api.Stats;
 import com.datatorrent.api.StatsListener;
 import com.datatorrent.api.StatsListener.OperatorRequest;
-
 import com.datatorrent.stram.util.AbstractWritableAdapter;
 
 /**
@@ -51,7 +50,8 @@ import com.datatorrent.stram.util.AbstractWritableAdapter;
 // @TokenInfo(JobTokenSelector.class)
 @InterfaceAudience.Private
 @InterfaceStability.Stable
-public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol {
+public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
+{
   public static final long versionID = 201208081755L;
 
   /**
@@ -60,9 +60,8 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
{
    * be deployed and removed dynamically.
    * <p>
    * <br>
-   *
    */
-  public static class StreamingContainerContext extends BaseContext implements ContainerContext
+  class StreamingContainerContext extends BaseContext implements ContainerContext
   {
     /**
      * Operators should start processing the initial window at this time.
@@ -85,9 +84,10 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
{
     }
 
     @Override
-    public String toString() {
+    public String toString()
+    {
       return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
-              .append("applicationAttributes", getAttributes()).toString();
+          .append("applicationAttributes", getAttributes()).toString();
     }
 
     @SuppressWarnings("FieldNameHidesFieldInSuperclass")
@@ -98,16 +98,17 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
{
    * Stats of deployed operator sent to the application master
    * <p>
    */
-  public static class OperatorHeartbeat implements Serializable
+  class OperatorHeartbeat implements Serializable
   {
     private static final long serialVersionUID = 201208171625L;
-    public ArrayList<ContainerStats.OperatorStats> windowStats = new ArrayList<ContainerStats.OperatorStats>();
+    public ArrayList<ContainerStats.OperatorStats> windowStats = new ArrayList<>();
 
     /**
      * The operator stats for the windows processed during the heartbeat interval.
      * @return
      */
-    public ArrayList<ContainerStats.OperatorStats> getOperatorStatsContainer() {
+    public ArrayList<ContainerStats.OperatorStats> getOperatorStatsContainer()
+    {
       return windowStats;
     }
 
@@ -116,11 +117,13 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
{
      */
     public int nodeId;
 
-    public int getNodeId() {
+    public int getNodeId()
+    {
       return nodeId;
     }
 
-    public void setNodeId(int nodeId) {
+    public void setNodeId(int nodeId)
+    {
       this.nodeId = nodeId;
     }
 
@@ -129,11 +132,13 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
{
      */
     public long generatedTms;
 
-    public long getGeneratedTms() {
+    public long getGeneratedTms()
+    {
       return generatedTms;
     }
 
-    public void setGeneratedTms(long generatedTms) {
+    public void setGeneratedTms(long generatedTms)
+    {
       this.generatedTms = generatedTms;
     }
 
@@ -143,18 +148,21 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
{
      */
     public long intervalMs;
 
-    public long getIntervalMs() {
+    public long getIntervalMs()
+    {
       return intervalMs;
     }
 
-    public void setIntervalMs(long intervalMs) {
+    public void setIntervalMs(long intervalMs)
+    {
       this.intervalMs = intervalMs;
     }
 
     /**
      * State of the operator (processing, idle etc).
      */
-    public static enum DeployState {
+    public enum DeployState
+    {
       ACTIVE,
       SHUTDOWN,
       FAILED // problemo!
@@ -162,11 +170,13 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
{
 
     public DeployState state;
 
-    public DeployState getState() {
+    public DeployState getState()
+    {
       return state;
     }
 
-    public void setState(DeployState state) {
+    public void setState(DeployState state)
+    {
       this.state = state;
     }
 
@@ -176,7 +186,7 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
{
     public ArrayList<StatsListener.OperatorResponse> requestResponse;
   }
 
-  public static class ContainerStats implements Stats
+  class ContainerStats implements Stats
   {
     private static final long serialVersionUID = 201309131904L;
     public final String id;
@@ -185,7 +195,7 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
{
     public ContainerStats(String id)
     {
       this.id = id;
-      operators = new ArrayList<OperatorHeartbeat>();
+      operators = new ArrayList<>();
     }
 
     @Override
@@ -207,7 +217,7 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
{
    * <br>
    *
    */
-  public static class ContainerHeartbeat extends AbstractWritableAdapter
+  class ContainerHeartbeat extends AbstractWritableAdapter
   {
     private static final long serialVersionUID = 1L;
 
@@ -228,32 +238,34 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
{
 
     public long sentTms = System.currentTimeMillis();
 
-    public ContainerStats getContainerStats() {
+    public ContainerStats getContainerStats()
+    {
       return stats;
     }
 
-    public void setContainerStats(ContainerStats stats) {
+    public void setContainerStats(ContainerStats stats)
+    {
       this.stats = stats;
     }
 
-    public String getContainerId() {
+    public String getContainerId()
+    {
       return stats.id;
     }
 
   }
 
   /**
-   *
    * Request by stram as response to heartbeat for further communication
    * <p>
    * <br>
    * The child container will continue RPC communication depending on the type
    * of request.<br>
    * <br>
-   *
    */
-  public static class StramToNodeRequest implements Serializable {
-    public static enum RequestType
+  class StramToNodeRequest implements Serializable
+  {
+    public enum RequestType
     {
       START_RECORDING, STOP_RECORDING, SYNC_RECORDING, SET_LOG_LEVEL, CUSTOM
     }
@@ -278,44 +290,53 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
{
       this.deleted = deleted;
     }
 
-    public int getOperatorId() {
+    public int getOperatorId()
+    {
       return operatorId;
     }
 
-    public void setOperatorId(int id) {
+    public void setOperatorId(int id)
+    {
       this.operatorId = id;
     }
 
-    public StramToNodeRequest.RequestType getRequestType() {
+    public StramToNodeRequest.RequestType getRequestType()
+    {
       return requestType;
     }
 
-    public void setRequestType(StramToNodeRequest.RequestType requestType) {
+    public void setRequestType(StramToNodeRequest.RequestType requestType)
+    {
       this.requestType = requestType;
     }
 
-    public long getRecoveryCheckpoint() {
+    public long getRecoveryCheckpoint()
+    {
       return recoveryCheckpoint;
     }
 
-    public void setRecoveryCheckpoint(long recoveryCheckpoint) {
+    public void setRecoveryCheckpoint(long recoveryCheckpoint)
+    {
       this.recoveryCheckpoint = recoveryCheckpoint;
     }
 
-    public String getPortName() {
+    public String getPortName()
+    {
       return portName;
     }
 
-    public void setPortName(String portName) {
+    public void setPortName(String portName)
+    {
       this.portName = portName;
     }
 
     @Override
-    public String toString() {
+    public String toString()
+    {
       return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
-              .append("operatorId", this.operatorId)
-              .append("requestType", this.requestType)
-              .append("portName", this.portName).toString();
+          .append("operatorId", this.operatorId)
+          .append("requestType", this.requestType)
+          .append("portName", this.portName).toString();
     }
   }
 
@@ -326,7 +347,8 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
{
    * <br>
    *
    */
-  public static class ContainerHeartbeatResponse extends AbstractWritableAdapter {
+  class ContainerHeartbeatResponse extends AbstractWritableAdapter
+  {
     private static final long serialVersionUID = 1L;
     /**
      * Indicate container to exit heartbeat loop and shutdown.
@@ -368,8 +390,7 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
{
    *
    * @param containerId
    * @return
-   * @throws IOException
-   * <br>
+   * @throws IOException <br>
    */
   StreamingContainerContext getInitContext(String containerId) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java b/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java
index 9297f89..3fe0373 100644
--- a/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java
+++ b/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java
@@ -25,23 +25,25 @@ import java.beans.PropertyDescriptor;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
 
-import org.apache.commons.lang3.ClassUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
 
 import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.Context.DAGContext;
-
-import com.datatorrent.api.StringCodec;
 import com.datatorrent.common.metric.AutoMetricBuiltInTransport;
 import com.datatorrent.common.util.Pair;
 import com.datatorrent.stram.PubSubWebSocketMetricTransport;
@@ -62,16 +64,15 @@ public class AppDataPushAgent extends AbstractService
   private static final String METRICS_SCHEMA_VERSION = "1.0";
   private static final String DATA = "data";
   private static final Logger LOG = LoggerFactory.getLogger(AppDataPushAgent.class);
-  private static final String APP_DATA_PUSH_TRANSPORT_BUILTIN_VALUE = "builtin";
   private final StreamingContainerManager dnmgr;
   private final StramAppContext appContext;
   private final AppDataPushThread appDataPushThread = new AppDataPushThread();
   private AutoMetric.Transport metricsTransport;
-  private final Map<Class<?>, List<Field>> cacheFields = new HashMap<Class<?>,
List<Field>>();
-  private final Map<Class<?>, Map<String, Method>> cacheGetMethods = new
HashMap<Class<?>, Map<String, Method>>();
+  private final Map<Class<?>, List<Field>> cacheFields = new HashMap<>();
+  private final Map<Class<?>, Map<String, Method>> cacheGetMethods = new
HashMap<>();
 
-  private final Map<String, Long> operatorsSchemaLastSentTime = Maps.newHashMap();
-  private final Map<String, JSONObject> operatorSchemas = Maps.newHashMap();
+  private final Map<String, Long> operatorsSchemaLastSentTime = new HashMap<>();
+  private final Map<String, JSONObject> operatorSchemas = new HashMap<>();
 
   public AppDataPushAgent(StreamingContainerManager dnmgr, StramAppContext appContext)
   {
@@ -143,8 +144,7 @@ public class AppDataPushAgent extends AbstractService
             Map<String, Object> aggregates = metrics.second;
             long now = System.currentTimeMillis();
             if (!operatorsSchemaLastSentTime.containsKey(logicalOperator.name) ||
-                (metricsTransport.getSchemaResendInterval() > 0 &&
-                    operatorsSchemaLastSentTime.get(logicalOperator.name) < now - metricsTransport.getSchemaResendInterval()))
{
+                (metricsTransport.getSchemaResendInterval() > 0 && operatorsSchemaLastSentTime.get(logicalOperator.name)
< now - metricsTransport.getSchemaResendInterval())) {
               try {
                 pushMetricsSchema(dnmgr.getLogicalPlan().getOperatorMeta(logicalOperator.name),
aggregates);
                 operatorsSchemaLastSentTime.put(logicalOperator.name, now);
@@ -185,7 +185,7 @@ public class AppDataPushAgent extends AbstractService
     if (cacheFields.containsKey(o.getClass())) {
       fields = cacheFields.get(o.getClass());
     } else {
-      fields = new ArrayList<Field>();
+      fields = new ArrayList<>();
 
       for (Class<?> c = o.getClass(); c != Object.class; c = c.getSuperclass()) {
         Field[] declaredFields = c.getDeclaredFields();
@@ -215,7 +215,7 @@ public class AppDataPushAgent extends AbstractService
     if (cacheGetMethods.containsKey(o.getClass())) {
       methods = cacheGetMethods.get(o.getClass());
     } else {
-      methods = new HashMap<String, Method>();    
+      methods = new HashMap<>();
       try {
         BeanInfo info = Introspector.getBeanInfo(o.getClass());
         for (PropertyDescriptor pd : info.getPropertyDescriptors()) {


Mime
View raw message