nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject [10/11] incubator-nifi git commit: NIFI-271 checkpoint
Date Wed, 22 Apr 2015 15:46:53 GMT
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
index 7a5ff2b..4e06926 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
@@ -21,13 +21,12 @@ import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
 import org.apache.nifi.cluster.protocol.jaxb.message.ConnectionResponseAdapter;
 
 /**
- * The cluster manager's response to a node's connection request.  If the manager
- * has a current copy of the data flow, then it is returned with a node identifier
- * to the node.  Otherwise, the manager will provide a "try again in X seconds" 
- * response to the node in hopes that a current data flow will be available upon
- * subsequent requests.
- * 
- * @author unattributed
+ * The cluster manager's response to a node's connection request. If the manager
+ * has a current copy of the data flow, then it is returned with a node
+ * identifier to the node. Otherwise, the manager will provide a "try again in X
+ * seconds" response to the node in hopes that a current data flow will be
+ * available upon subsequent requests.
+ *
  */
 @XmlJavaTypeAdapter(ConnectionResponseAdapter.class)
 public class ConnectionResponse {
@@ -40,14 +39,14 @@ public class ConnectionResponse {
     private final Integer managerRemoteInputPort;
     private final Boolean managerRemoteCommsSecure;
     private final String instanceId;
-    
+
     private volatile String clusterManagerDN;
-    
-    public ConnectionResponse(final NodeIdentifier nodeIdentifier, final StandardDataFlow dataFlow, final boolean primary, 
-        final Integer managerRemoteInputPort, final Boolean managerRemoteCommsSecure, final String instanceId) {
-        if(nodeIdentifier == null) {
+
+    public ConnectionResponse(final NodeIdentifier nodeIdentifier, final StandardDataFlow dataFlow, final boolean primary,
+            final Integer managerRemoteInputPort, final Boolean managerRemoteCommsSecure, final String instanceId) {
+        if (nodeIdentifier == null) {
             throw new IllegalArgumentException("Node identifier may not be empty or null.");
-        } else if(dataFlow == null) {
+        } else if (dataFlow == null) {
             throw new IllegalArgumentException("DataFlow may not be null.");
         }
         this.nodeIdentifier = nodeIdentifier;
@@ -59,9 +58,9 @@ public class ConnectionResponse {
         this.managerRemoteCommsSecure = managerRemoteCommsSecure;
         this.instanceId = instanceId;
     }
-    
+
     public ConnectionResponse(final int tryLaterSeconds) {
-        if(tryLaterSeconds <= 0) {
+        if (tryLaterSeconds <= 0) {
             throw new IllegalArgumentException("Try-Later seconds may not be nonnegative: " + tryLaterSeconds);
         }
         this.dataFlow = null;
@@ -84,19 +83,19 @@ public class ConnectionResponse {
         this.managerRemoteCommsSecure = null;
         this.instanceId = null;
     }
-    
+
     public static ConnectionResponse createBlockedByFirewallResponse() {
         return new ConnectionResponse();
     }
-    
+
     public boolean isPrimary() {
         return primary;
     }
-    
+
     public boolean shouldTryLater() {
         return tryLaterSeconds > 0;
     }
-    
+
     public boolean isBlockedByFirewall() {
         return blockedByFirewall;
     }
@@ -104,11 +103,11 @@ public class ConnectionResponse {
     public int getTryLaterSeconds() {
         return tryLaterSeconds;
     }
-    
+
     public StandardDataFlow getDataFlow() {
         return dataFlow;
     }
-    
+
     public NodeIdentifier getNodeIdentifier() {
         return nodeIdentifier;
     }
@@ -116,23 +115,22 @@ public class ConnectionResponse {
     public Integer getManagerRemoteInputPort() {
         return managerRemoteInputPort;
     }
-    
+
     public Boolean isManagerRemoteCommsSecure() {
         return managerRemoteCommsSecure;
     }
-    
+
     public String getInstanceId() {
         return instanceId;
     }
-    
+
     public void setClusterManagerDN(final String dn) {
         this.clusterManagerDN = dn;
     }
-    
+
     /**
-     * Returns the DN of the NCM, if it is available or <code>null</code> otherwise.
-     * 
-     * @return
+     * @return the DN of the NCM, if it is available or <code>null</code>
+     * otherwise
      */
     public String getClusterManagerDN() {
         return clusterManagerDN;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
index 67324a1..04fb3f0 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
@@ -23,44 +23,45 @@ import org.apache.nifi.cluster.protocol.jaxb.message.HeartbeatAdapter;
 
 /**
  * A heartbeat for indicating the status of a node to the cluster.
+ *
  * @author unattributed
  */
 @XmlJavaTypeAdapter(HeartbeatAdapter.class)
 public class Heartbeat {
-    
+
     private final NodeIdentifier nodeIdentifier;
     private final boolean primary;
     private final boolean connected;
     private final long createdTimestamp;
     private final byte[] payload;
-    
+
     public Heartbeat(final NodeIdentifier nodeIdentifier, final boolean primary, final boolean connected, final byte[] payload) {
-        if(nodeIdentifier == null) {
+        if (nodeIdentifier == null) {
             throw new IllegalArgumentException("Node Identifier may not be null.");
-        } 
+        }
         this.nodeIdentifier = nodeIdentifier;
         this.primary = primary;
         this.connected = connected;
         this.payload = payload;
         this.createdTimestamp = new Date().getTime();
     }
-    
+
     public NodeIdentifier getNodeIdentifier() {
         return nodeIdentifier;
     }
-    
+
     public byte[] getPayload() {
         return payload;
     }
-    
+
     public boolean isPrimary() {
         return primary;
     }
-    
+
     public boolean isConnected() {
         return connected;
     }
-    
+
     @XmlTransient
     public long getCreatedTimestamp() {
         return createdTimestamp;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java
index a120524..86df107 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java
@@ -40,5 +40,5 @@ public class NodeBulletins {
     public byte[] getPayload() {
         return payload;
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
index 1893186..4b10be6 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
@@ -19,56 +19,68 @@ package org.apache.nifi.cluster.protocol;
 import org.apache.commons.lang3.StringUtils;
 
 /**
- * A node identifier denoting the coordinates of a flow controller that is connected 
- * to a cluster.  Nodes provide an external public API interface and an internal private
- * interface for communicating with the cluster.
- * 
- * The external API interface and internal protocol each require an IP or hostname 
- * as well as a port for communicating. 
- * 
+ * A node identifier denoting the coordinates of a flow controller that is
+ * connected to a cluster. Nodes provide an external public API interface and an
+ * internal private interface for communicating with the cluster.
+ *
+ * The external API interface and internal protocol each require an IP or
+ * hostname as well as a port for communicating.
+ *
  * This class overrides hashCode and equals and considers two instances to be
  * equal if they have the equal IDs.
- * 
+ *
  * @author unattributed
  * @Immutable
  * @Threadsafe
  */
 public class NodeIdentifier {
- 
-    /** the unique identifier for the node */
+
+    /**
+     * the unique identifier for the node
+     */
     private final String id;
-    
-    /** the IP or hostname to use for sending requests to the node's external interface */
+
+    /**
+     * the IP or hostname to use for sending requests to the node's external
+     * interface
+     */
     private final String apiAddress;
-    
-    /** the port to use use for sending requests to the node's external interface */
-    private final int apiPort;    
-    
-    /** the IP or hostname to use for sending requests to the node's internal interface */
+
+    /**
+     * the port to use use for sending requests to the node's external interface
+     */
+    private final int apiPort;
+
+    /**
+     * the IP or hostname to use for sending requests to the node's internal
+     * interface
+     */
     private final String socketAddress;
-    
-    /** the port to use use for sending requests to the node's internal interface */
+
+    /**
+     * the port to use use for sending requests to the node's internal interface
+     */
     private final int socketPort;
-    
+
     private final String nodeDn;
 
     public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort) {
         this(id, apiAddress, apiPort, socketAddress, socketPort, null);
     }
-    
+
     public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort, final String dn) {
-        
-        if(StringUtils.isBlank(id)) {
+
+        if (StringUtils.isBlank(id)) {
             throw new IllegalArgumentException("Node ID may not be empty or null.");
-        } else if(StringUtils.isBlank(apiAddress)) {
+        } else if (StringUtils.isBlank(apiAddress)) {
             throw new IllegalArgumentException("Node API address may not be empty or null.");
-        } else if(StringUtils.isBlank(socketAddress)) {
+        } else if (StringUtils.isBlank(socketAddress)) {
             throw new IllegalArgumentException("Node socket address may not be empty or null.");
-        } 
-        
+        }
+
         validatePort(apiPort);
         validatePort(socketPort);
-        
+
         this.id = id;
         this.apiAddress = apiAddress;
         this.apiPort = apiPort;
@@ -80,11 +92,11 @@ public class NodeIdentifier {
     public String getId() {
         return id;
     }
-    
+
     public String getDN() {
         return nodeDn;
     }
-    
+
     public String getApiAddress() {
         return apiAddress;
     }
@@ -96,22 +108,22 @@ public class NodeIdentifier {
     public String getSocketAddress() {
         return socketAddress;
     }
-    
+
     public int getSocketPort() {
         return socketPort;
     }
-    
+
     private void validatePort(final int port) {
-        if(port < 1 || port > 65535) {
+        if (port < 1 || port > 65535) {
             throw new IllegalArgumentException("Port must be inclusively in the range [1, 65535].  Port given: " + port);
-        }   
+        }
     }
-    
+
     /**
      * Compares the id of two node identifiers for equality.
-     * 
+     *
      * @param obj a node identifier
-     * 
+     *
      * @return true if the id is equal; false otherwise
      */
     @Override
@@ -130,33 +142,33 @@ public class NodeIdentifier {
     }
 
     /**
-     * Compares API address/port and socket address/port for equality.  The 
-     * id is not used for comparison.
-     * 
+     * Compares API address/port and socket address/port for equality. The id is
+     * not used for comparison.
+     *
      * @param other a node identifier
-     * 
+     *
      * @return true if API address/port and socket address/port are equal; false
      * otherwise
      */
     public boolean logicallyEquals(final NodeIdentifier other) {
-        if(other == null) {
+        if (other == null) {
             return false;
         }
         if ((this.apiAddress == null) ? (other.apiAddress != null) : !this.apiAddress.equals(other.apiAddress)) {
             return false;
         }
-        if(this.apiPort != other.apiPort) {
+        if (this.apiPort != other.apiPort) {
             return false;
         }
         if ((this.socketAddress == null) ? (other.socketAddress != null) : !this.socketAddress.equals(other.socketAddress)) {
             return false;
         }
-        if(this.socketPort != other.socketPort) {
+        if (this.socketPort != other.socketPort) {
             return false;
         }
         return true;
     }
-    
+
     @Override
     public int hashCode() {
         int hash = 7;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
index 1edcb91..f3e5df4 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
@@ -24,50 +24,61 @@ import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
 
 /**
- * An interface for sending protocol messages from a node to the cluster manager.
- * @author unattributed
+ * An interface for sending protocol messages from a node to the cluster
+ * manager.
+ *
  */
 public interface NodeProtocolSender {
-    
+
     /**
      * Sends a "connection request" message to the cluster manager.
+     *
      * @param msg a message
      * @return the response
-     * @throws UnknownServiceAddressException if the cluster manager's address is not known
+     * @throws UnknownServiceAddressException if the cluster manager's address
+     * is not known
      * @throws ProtocolException if communication failed
      */
     ConnectionResponseMessage requestConnection(ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException;
-    
+
     /**
      * Sends a "heartbeat" message to the cluster manager.
+     *
      * @param msg a message
-     * @throws UnknownServiceAddressException if the cluster manager's address is not known
+     * @throws UnknownServiceAddressException if the cluster manager's address
+     * is not known
      * @throws ProtocolException if communication failed
      */
     void heartbeat(HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException;
-    
+
     /**
      * Sends a bulletins message to the cluster manager.
-     * @param msg
-     * @throws ProtocolException
-     * @throws UnknownServiceAddressException 
+     *
+     * @param msg a message
+     * @throws ProtocolException pe
+     * @throws UnknownServiceAddressException ex
      */
     void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException;
-    
+
     /**
      * Sends a failure notification if the controller was unable start.
+     *
      * @param msg a message
-     * @throws UnknownServiceAddressException if the cluster manager's address is not known
+     * @throws UnknownServiceAddressException if the cluster manager's address
+     * is not known
      * @throws ProtocolException if communication failed
      */
     void notifyControllerStartupFailure(ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException;
-    
+
     /**
-     * Sends a failure notification if the node was unable to reconnect to the cluster
+     * Sends a failure notification if the node was unable to reconnect to the
+     * cluster
+     *
      * @param msg a message
-     * @throws UnknownServiceAddressException if the cluster manager's address is not known
+     * @throws UnknownServiceAddressException if the cluster manager's address
+     * is not known
      * @throws ProtocolException if communication failed
      */
     void notifyReconnectionFailure(ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException;
-    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java
index b614e76..11a3912 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java
@@ -17,22 +17,24 @@
 package org.apache.nifi.cluster.protocol;
 
 /**
- * The context for communicating using the internal cluster protocol. 
- * 
+ * The context for communicating using the internal cluster protocol.
+ *
  * @param <T> The type of protocol message.
- * 
+ *
  * @author unattributed
  */
 public interface ProtocolContext<T> {
- 
+
     /**
      * Creates a marshaller for serializing protocol messages.
+     *
      * @return a marshaller
      */
     ProtocolMessageMarshaller<T> createMarshaller();
-    
+
     /**
      * Creates an unmarshaller for deserializing protocol messages.
+     *
      * @return a unmarshaller
      */
     ProtocolMessageUnmarshaller<T> createUnmarshaller();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java
index f11ad84..b6c3737 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java
@@ -19,21 +19,22 @@ package org.apache.nifi.cluster.protocol;
 /**
  * The base exception for problems encountered while communicating within the
  * cluster.
+ *
  * @author unattributed
  */
 public class ProtocolException extends RuntimeException {
-    
+
     public ProtocolException() {
     }
-    
+
     public ProtocolException(String msg) {
         super(msg);
     }
-    
+
     public ProtocolException(Throwable cause) {
         super(cause);
     }
-    
+
     public ProtocolException(String msg, Throwable cause) {
         super(msg, cause);
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java
index 6de87db..b2bace9 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java
@@ -20,25 +20,26 @@ import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 
 /**
  * A handler for processing protocol messages.
- * @author unattributed
+ *
  */
 public interface ProtocolHandler {
-    
+
     /**
      * Handles the given protocol message or throws an exception if it cannot
-     * handle the message.  If no response is needed by the protocol, then null
+     * handle the message. If no response is needed by the protocol, then null
      * should be returned.
-     * 
+     *
      * @param msg a message
      * @return a response or null, if no response is necessary
-     * 
+     *
      * @throws ProtocolException if the message could not be processed
      */
     ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException;
-    
+
     /**
-     * @param msg
-     * @return true if the handler can process the given message; false otherwise
+     * @param msg a message
+     * @return true if the handler can process the given message; false
+     * otherwise
      */
     boolean canHandle(ProtocolMessage msg);
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java
index 32f0f5d..2f35241 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java
@@ -23,48 +23,53 @@ import org.apache.nifi.reporting.BulletinRepository;
 
 /**
  * Defines the interface for a listener to process protocol messages.
- * @author unattributed
+ *
  */
 public interface ProtocolListener {
-    
+
     /**
-     * Starts the instance for listening for messages.  Start may only be called
+     * Starts the instance for listening for messages. Start may only be called
      * if the instance is not running.
-     * @throws java.io.IOException
+     *
+     * @throws java.io.IOException ex
      */
     void start() throws IOException;
-    
+
     /**
-     * Stops the instance from listening for messages.  Stop may only be called
+     * Stops the instance from listening for messages. Stop may only be called
      * if the instance is running.
-     * @throws java.io.IOException
+     *
+     * @throws java.io.IOException ex
      */
     void stop() throws IOException;
-    
+
     /**
      * @return true if the instance is started; false otherwise.
      */
     boolean isRunning();
-    
+
     /**
      * @return the handlers registered with the listener
      */
     Collection<ProtocolHandler> getHandlers();
-    
+
     /**
      * Registers a handler with the listener.
+     *
      * @param handler a handler
      */
     void addHandler(ProtocolHandler handler);
-    
+
     /**
      * Sets the BulletinRepository that can be used to report bulletins
-     * @param bulletinRepository
+     *
+     * @param bulletinRepository repo
      */
     void setBulletinRepository(BulletinRepository bulletinRepository);
-    
+
     /**
      * Unregisters the handler with the listener.
+     *
      * @param handler a handler
      * @return true if the handler was removed; false otherwise
      */

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java
index bb436e0..4e43d4d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java
@@ -21,15 +21,16 @@ import java.io.OutputStream;
 
 /**
  * Defines a marshaller for serializing protocol messages.
- * 
+ *
  * @param <T> The type of protocol message.
- * 
+ *
  * @author unattributed
  */
 public interface ProtocolMessageMarshaller<T> {
-    
+
     /**
      * Serializes the given message to the given output stream.
+     *
      * @param msg a message
      * @param os an output stream
      * @throws IOException if the message could not be serialized to the stream

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java
index c690e7b..e8910bd 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java
@@ -21,18 +21,19 @@ import java.io.InputStream;
 
 /**
  * Defines an unmarshaller for deserializing protocol messages.
- * 
+ *
  * @param <T> The type of protocol message.
- * 
- * @author unattributed
+ *
  */
 public interface ProtocolMessageUnmarshaller<T> {
-    
+
     /**
      * Deserializes a message on the given input stream.
+     *
      * @param is an input stream
-     * @return 
-     * @throws IOException if the message could not be deserialized from the stream
+     * @return deserialized message
+     * @throws IOException if the message could not be deserialized from the
+     * stream
      */
     T unmarshal(InputStream is) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
index c2d16fc..0f0ed69 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
@@ -25,25 +25,25 @@ import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
 import org.apache.nifi.cluster.protocol.jaxb.message.DataFlowAdapter;
 
 /**
- * Represents a dataflow, which includes the raw bytes of the flow.xml and 
+ * Represents a dataflow, which includes the raw bytes of the flow.xml and
  * whether processors should be started automatically at application startup.
  */
 @XmlJavaTypeAdapter(DataFlowAdapter.class)
 public class StandardDataFlow implements Serializable, DataFlow {
-    
+
     private final byte[] flow;
     private final byte[] templateBytes;
     private final byte[] snippetBytes;
 
     private boolean autoStartProcessors;
-    
+
     /**
-     * Constructs an instance.  
-     * 
+     * Constructs an instance.
+     *
      * @param flow a valid flow as bytes, which cannot be null
      * @param templateBytes an XML representation of templates
      * @param snippetBytes an XML representation of snippets
-     * 
+     *
      * @throws NullPointerException if any argument is null
      */
     public StandardDataFlow(final byte[] flow, final byte[] templateBytes, final byte[] snippetBytes) {
@@ -51,20 +51,20 @@ public class StandardDataFlow implements Serializable, DataFlow {
         this.templateBytes = templateBytes;
         this.snippetBytes = snippetBytes;
     }
-    
+
     public StandardDataFlow(final DataFlow toCopy) {
         this.flow = copy(toCopy.getFlow());
         this.templateBytes = copy(toCopy.getTemplates());
         this.snippetBytes = copy(toCopy.getSnippets());
         this.autoStartProcessors = toCopy.isAutoStartProcessors();
     }
-    
+
     private static byte[] copy(final byte[] bytes) {
         return bytes == null ? null : Arrays.copyOf(bytes, bytes.length);
     }
-    
+
     /**
-     * @return the raw byte array of the flow 
+     * @return the raw byte array of the flow
      */
     public byte[] getFlow() {
         return flow;
@@ -76,26 +76,26 @@ public class StandardDataFlow implements Serializable, DataFlow {
     public byte[] getTemplates() {
         return templateBytes;
     }
-    
+
     /**
      * @return the raw byte array of the snippets
      */
     public byte[] getSnippets() {
         return snippetBytes;
     }
-    
+
     /**
-     * @return true if processors should be automatically started at application 
-     * startup; false otherwise 
+     * @return true if processors should be automatically started at application
+     * startup; false otherwise
      */
     public boolean isAutoStartProcessors() {
         return autoStartProcessors;
     }
-    
+
     /**
-     * 
+     *
      * Sets the flag to automatically start processors at application startup.
-     * 
+     *
      * @param autoStartProcessors true if processors should be automatically
      * started at application startup; false otherwise
      */

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java
index 41c74eb..dc22ba0 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java
@@ -18,21 +18,22 @@ package org.apache.nifi.cluster.protocol;
 
 /**
  * Represents the exceptional case when a service's address is not known.
+ *
  * @author unattributed
  */
 public class UnknownServiceAddressException extends RuntimeException {
-    
+
     public UnknownServiceAddressException() {
     }
-    
+
     public UnknownServiceAddressException(String msg) {
         super(msg);
     }
-    
+
     public UnknownServiceAddressException(Throwable cause) {
         super(cause);
     }
-    
+
     public UnknownServiceAddressException(String msg, Throwable cause) {
         super(msg, cause);
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
index ceb3fcb..636a6d3 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
@@ -43,34 +43,32 @@ import org.apache.nifi.util.FormatUtils;
 
 /**
  * A protocol sender for sending protocol messages from the cluster manager to
- * nodes.  
- * 
- * Connection-type requests (e.g., reconnection, disconnection) by nature of 
- * starting/stopping flow controllers take longer than other types of protocol 
- * messages.  Therefore, a handshake timeout may be specified to lengthen the 
+ * nodes.
+ *
+ * Connection-type requests (e.g., reconnection, disconnection) by nature of
+ * starting/stopping flow controllers take longer than other types of protocol
+ * messages. Therefore, a handshake timeout may be specified to lengthen the
  * allowable time for communication with the node.
- * 
- * @author unattributed
+ *
  */
 public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolSender {
 
-    
     private final ProtocolContext<ProtocolMessage> protocolContext;
     private final SocketConfiguration socketConfiguration;
     private int handshakeTimeoutSeconds;
     private volatile BulletinRepository bulletinRepository;
 
     public ClusterManagerProtocolSenderImpl(final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) {
-        if(socketConfiguration == null) {
+        if (socketConfiguration == null) {
             throw new IllegalArgumentException("Socket configuration may not be null.");
-        } else if(protocolContext == null) {
+        } else if (protocolContext == null) {
             throw new IllegalArgumentException("Protocol Context may not be null.");
         }
         this.socketConfiguration = socketConfiguration;
         this.protocolContext = protocolContext;
         this.handshakeTimeoutSeconds = -1;  // less than zero denotes variable not configured
     }
-    
+
     @Override
     public void setBulletinRepository(final BulletinRepository bulletinRepository) {
         this.bulletinRepository = bulletinRepository;
@@ -78,76 +76,79 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS
 
     /**
      * Requests the data flow from a node.
+     *
      * @param msg a message
      * @return the message response
-     * @throws @throws ProtocolException if the message failed to be sent or the response was malformed
+     * @throws ProtocolException if the message failed to be sent or the
+     * response was malformed
      */
     @Override
     public FlowResponseMessage requestFlow(final FlowRequestMessage msg) throws ProtocolException {
         Socket socket = null;
         try {
-        	socket = createSocket(msg.getNodeId(), false);
-            
+            socket = createSocket(msg.getNodeId(), false);
+
             try {
                 // marshal message to output stream
                 final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
                 marshaller.marshal(msg, socket.getOutputStream());
-            } catch(final IOException ioe) {
+            } catch (final IOException ioe) {
                 throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
             }
-            
+
             final ProtocolMessage response;
             try {
                 // unmarshall response and return
                 final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
                 response = unmarshaller.unmarshal(socket.getInputStream());
-            } catch(final IOException ioe) {
+            } catch (final IOException ioe) {
                 throw new ProtocolException("Failed unmarshalling '" + MessageType.FLOW_RESPONSE + "' protocol message due to: " + ioe, ioe);
-            } 
-            
-            if(MessageType.FLOW_RESPONSE == response.getType()) {
+            }
+
+            if (MessageType.FLOW_RESPONSE == response.getType()) {
                 return (FlowResponseMessage) response;
             } else {
                 throw new ProtocolException("Expected message type '" + MessageType.FLOW_RESPONSE + "' but found '" + response.getType() + "'");
             }
-            
+
         } finally {
             SocketUtils.closeQuietly(socket);
         }
     }
 
     /**
-     * Requests a node to reconnect to the cluster.  The configured value for
+     * Requests a node to reconnect to the cluster. The configured value for
      * handshake timeout is applied to the socket before making the request.
+     *
      * @param msg a message
      * @return the response
-     * @throws ProtocolException if the message failed to be sent or the response was malformed
+     * @throws ProtocolException if the message failed to be sent or the
+     * response was malformed
      */
     @Override
     public ReconnectionResponseMessage requestReconnection(final ReconnectionRequestMessage msg) throws ProtocolException {
         Socket socket = null;
         try {
-        	socket = createSocket(msg.getNodeId(), true);
+            socket = createSocket(msg.getNodeId(), true);
 
             // marshal message to output stream
             try {
                 final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
                 marshaller.marshal(msg, socket.getOutputStream());
-            } catch(final IOException ioe) {
+            } catch (final IOException ioe) {
                 throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
             }
-            
-            
+
             final ProtocolMessage response;
             try {
                 // unmarshall response and return
                 final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
                 response = unmarshaller.unmarshal(socket.getInputStream());
-            } catch(final IOException ioe) {
+            } catch (final IOException ioe) {
                 throw new ProtocolException("Failed unmarshalling '" + MessageType.RECONNECTION_RESPONSE + "' protocol message due to: " + ioe, ioe);
-            } 
-            
-            if(MessageType.RECONNECTION_RESPONSE == response.getType()) {
+            }
+
+            if (MessageType.RECONNECTION_RESPONSE == response.getType()) {
                 return (ReconnectionResponseMessage) response;
             } else {
                 throw new ProtocolException("Expected message type '" + MessageType.FLOW_RESPONSE + "' but found '" + response.getType() + "'");
@@ -156,10 +157,11 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS
             SocketUtils.closeQuietly(socket);
         }
     }
-    
+
     /**
-     * Requests a node to disconnect from the cluster.  The configured value for
+     * Requests a node to disconnect from the cluster. The configured value for
      * handshake timeout is applied to the socket before making the request.
+     *
      * @param msg a message
      * @throws ProtocolException if the message failed to be sent
      */
@@ -167,13 +169,13 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS
     public void disconnect(final DisconnectMessage msg) throws ProtocolException {
         Socket socket = null;
         try {
-        	socket = createSocket(msg.getNodeId(), true);
+            socket = createSocket(msg.getNodeId(), true);
 
             // marshal message to output stream
             try {
                 final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
                 marshaller.marshal(msg, socket.getOutputStream());
-            } catch(final IOException ioe) {
+            } catch (final IOException ioe) {
                 throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
             }
         } finally {
@@ -183,37 +185,36 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS
 
     /**
      * Assigns the primary role to a node.
-     * 
+     *
      * @param msg a message
-     * 
+     *
      * @throws ProtocolException if the message failed to be sent
      */
     @Override
     public void assignPrimaryRole(final PrimaryRoleAssignmentMessage msg) throws ProtocolException {
         Socket socket = null;
         try {
-        	socket = createSocket(msg.getNodeId(), true);
+            socket = createSocket(msg.getNodeId(), true);
 
             try {
                 // marshal message to output stream
                 final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
                 marshaller.marshal(msg, socket.getOutputStream());
-            } catch(final IOException ioe) {
+            } catch (final IOException ioe) {
                 throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
             }
         } finally {
             SocketUtils.closeQuietly(socket);
         }
     }
-    
-    
+
     private void setConnectionHandshakeTimeoutOnSocket(final Socket socket) throws SocketException {
         // update socket timeout, if handshake timeout was set; otherwise use socket's current timeout
-        if(handshakeTimeoutSeconds >= 0) {
+        if (handshakeTimeoutSeconds >= 0) {
             socket.setSoTimeout(handshakeTimeoutSeconds * 1000);
-        }   
+        }
     }
-    
+
     public SocketConfiguration getSocketConfiguration() {
         return socketConfiguration;
     }
@@ -227,18 +228,18 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS
     }
 
     private Socket createSocket(final NodeIdentifier nodeId, final boolean applyHandshakeTimeout) {
-    	return createSocket(nodeId.getSocketAddress(), nodeId.getSocketPort(), applyHandshakeTimeout);
+        return createSocket(nodeId.getSocketAddress(), nodeId.getSocketPort(), applyHandshakeTimeout);
     }
-    
+
     private Socket createSocket(final String host, final int port, final boolean applyHandshakeTimeout) {
-    	try {
+        try {
             // create a socket
             final Socket socket = SocketUtils.createSocket(InetSocketAddress.createUnresolved(host, port), socketConfiguration);
-            if ( applyHandshakeTimeout ) {
-            	setConnectionHandshakeTimeoutOnSocket(socket);
+            if (applyHandshakeTimeout) {
+                setConnectionHandshakeTimeoutOnSocket(socket);
             }
             return socket;
-        } catch(final IOException ioe) {
+        } catch (final IOException ioe) {
             throw new ProtocolException("Failed to create socket due to: " + ioe, ioe);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
index 933e5fa..2837f1a 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
@@ -32,21 +32,21 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
 import org.apache.nifi.reporting.BulletinRepository;
 
 /**
- * A wrapper class for consolidating a protocol sender and listener for the cluster
- * manager.
- * 
+ * A wrapper class for consolidating a protocol sender and listener for the
+ * cluster manager.
+ *
  * @author unattributed
  */
 public class ClusterManagerProtocolSenderListener implements ClusterManagerProtocolSender, ProtocolListener {
-    
+
     private final ClusterManagerProtocolSender sender;
-    
+
     private final ProtocolListener listener;
-    
+
     public ClusterManagerProtocolSenderListener(final ClusterManagerProtocolSender sender, final ProtocolListener listener) {
-        if(sender == null) {
+        if (sender == null) {
             throw new IllegalArgumentException("ClusterManagerProtocolSender may not be null.");
-        } else if(listener == null) {
+        } else if (listener == null) {
             throw new IllegalArgumentException("ProtocolListener may not be null.");
         }
         this.sender = sender;
@@ -55,7 +55,7 @@ public class ClusterManagerProtocolSenderListener implements ClusterManagerProto
 
     @Override
     public void stop() throws IOException {
-        if(!isRunning()) {
+        if (!isRunning()) {
             throw new IllegalStateException("Instance is already stopped.");
         }
         listener.stop();
@@ -63,7 +63,7 @@ public class ClusterManagerProtocolSenderListener implements ClusterManagerProto
 
     @Override
     public void start() throws IOException {
-        if(isRunning()) {
+        if (isRunning()) {
             throw new IllegalStateException("Instance is already started.");
         }
         listener.start();
@@ -88,13 +88,13 @@ public class ClusterManagerProtocolSenderListener implements ClusterManagerProto
     public void addHandler(final ProtocolHandler handler) {
         listener.addHandler(handler);
     }
-    
+
     @Override
     public void setBulletinRepository(final BulletinRepository bulletinRepository) {
         listener.setBulletinRepository(bulletinRepository);
         sender.setBulletinRepository(bulletinRepository);
     }
-    
+
     @Override
     public FlowResponseMessage requestFlow(final FlowRequestMessage msg) throws ProtocolException {
         return sender.requestFlow(msg);
@@ -109,10 +109,10 @@ public class ClusterManagerProtocolSenderListener implements ClusterManagerProto
     public void disconnect(DisconnectMessage msg) throws ProtocolException {
         sender.disconnect(msg);
     }
-    
+
     @Override
     public void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException {
         sender.assignPrimaryRole(msg);
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java
index 24e51e0..f808c83 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
  * discovery. The instance must be stopped before termination of the JVM to
  * ensure proper resource clean-up.
  *
- * @author unattributed
  */
 public class ClusterServiceDiscovery implements MulticastServiceDiscovery, ProtocolListener {
 
@@ -60,7 +59,6 @@ public class ClusterServiceDiscovery implements MulticastServiceDiscovery, Proto
      */
     private DiscoverableService service;
 
-    
     public ClusterServiceDiscovery(final String serviceName, final InetSocketAddress multicastAddress,
             final MulticastConfiguration multicastConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) {
 
@@ -162,7 +160,8 @@ public class ClusterServiceDiscovery implements MulticastServiceDiscovery, Proto
                                 || broadcastMsg.getPort() != oldService.getServiceAddress().getPort()) {
                             service = new DiscoverableServiceImpl(serviceName, InetSocketAddress.createUnresolved(broadcastMsg.getAddress(), broadcastMsg.getPort()));
                             final InetSocketAddress oldServiceAddress = (oldService == null) ? null : oldService.getServiceAddress();
-                            logger.info(String.format("Updating cluster service address for '%s' from '%s' to '%s'", serviceName, prettyPrint(oldServiceAddress), prettyPrint(service.getServiceAddress())));
+                            logger.info(String.format("Updating cluster service address for '%s' from '%s' to '%s'",
+                                    serviceName, prettyPrint(oldServiceAddress), prettyPrint(service.getServiceAddress())));
                         }
                     }
                     return null;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java
index bebfde8..64ca7fa 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java
@@ -27,39 +27,39 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Implements the ServiceLocator interface for locating the socket address
- * of a cluster service.  Depending on configuration, the address may be located
- * using service discovery.  If using service discovery, then the service methods
- * must be used for starting and stopping discovery.
- * 
- * Service discovery may be used in conjunction with a fixed port.  In this case,
- * the service discovery will yield the service IP/host while the fixed port will
- * be used for the port.
- * 
+ * Implements the ServiceLocator interface for locating the socket address of a
+ * cluster service. Depending on configuration, the address may be located using
+ * service discovery. If using service discovery, then the service methods must
+ * be used for starting and stopping discovery.
+ *
+ * Service discovery may be used in conjunction with a fixed port. In this case,
+ * the service discovery will yield the service IP/host while the fixed port
+ * will be used for the port.
+ *
  * Alternatively, the instance may be configured with exact service location, in
- * which case, no service discovery occurs and the caller will always receive the
- * configured service.
- * 
+ * which case, no service discovery occurs and the caller will always receive
+ * the configured service.
+ *
  * @author unattributed
  */
 public class ClusterServiceLocator implements ServiceDiscovery {
-    
+
     private static final Logger logger = LoggerFactory.getLogger(ClusterServiceLocator.class);
-    
+
     private final String serviceName;
-    
+
     private final ClusterServiceDiscovery serviceDiscovery;
-    
+
     private final DiscoverableService fixedService;
 
     private final int fixedServicePort;
-    
+
     private final AttemptsConfig attemptsConfig = new AttemptsConfig();
-    
+
     private final AtomicBoolean running = new AtomicBoolean(false);
-    
+
     public ClusterServiceLocator(final ClusterServiceDiscovery serviceDiscovery) {
-        if(serviceDiscovery == null) {
+        if (serviceDiscovery == null) {
             throw new IllegalArgumentException("ClusterServiceDiscovery may not be null.");
         }
         this.serviceDiscovery = serviceDiscovery;
@@ -67,9 +67,9 @@ public class ClusterServiceLocator implements ServiceDiscovery {
         this.fixedServicePort = 0;
         this.serviceName = serviceDiscovery.getServiceName();
     }
-    
+
     public ClusterServiceLocator(final ClusterServiceDiscovery serviceDiscovery, final int fixedServicePort) {
-        if(serviceDiscovery == null) {
+        if (serviceDiscovery == null) {
             throw new IllegalArgumentException("ClusterServiceDiscovery may not be null.");
         }
         this.serviceDiscovery = serviceDiscovery;
@@ -77,9 +77,9 @@ public class ClusterServiceLocator implements ServiceDiscovery {
         this.fixedServicePort = fixedServicePort;
         this.serviceName = serviceDiscovery.getServiceName();
     }
-    
+
     public ClusterServiceLocator(final DiscoverableService fixedService) {
-        if(fixedService == null) {
+        if (fixedService == null) {
             throw new IllegalArgumentException("Service may not be null.");
         }
         this.serviceDiscovery = null;
@@ -87,30 +87,30 @@ public class ClusterServiceLocator implements ServiceDiscovery {
         this.fixedServicePort = 0;
         this.serviceName = fixedService.getServiceName();
     }
-    
+
     @Override
     public DiscoverableService getService() {
-        
+
         final int numAttemptsValue;
         final int secondsBetweenAttempts;
-        synchronized(this) {
+        synchronized (this) {
             numAttemptsValue = attemptsConfig.numAttempts;
             secondsBetweenAttempts = attemptsConfig.getTimeBetweenAttempts();
         }
-        
+
         // try for a configured amount of attempts to retrieve the service address
-        for(int i = 0; i < numAttemptsValue; i++) {
+        for (int i = 0; i < numAttemptsValue; i++) {
 
-            if(fixedService != null) {
+            if (fixedService != null) {
                 return fixedService;
-            } else if(serviceDiscovery != null) {
-                
+            } else if (serviceDiscovery != null) {
+
                 final DiscoverableService discoveredService = serviceDiscovery.getService();
-                
+
                 // if we received an address
-                if(discoveredService != null) {
+                if (discoveredService != null) {
                     // if we were configured with a fixed port, then use the discovered host and fixed port; otherwise use the discovered address
-                    if(fixedServicePort > 0) {
+                    if (fixedServicePort > 0) {
                         // create service using discovered service name and address with fixed service port
                         final InetSocketAddress addr = InetSocketAddress.createUnresolved(discoveredService.getServiceAddress().getHostName(), fixedServicePort);
                         final DiscoverableService result = new DiscoverableServiceImpl(discoveredService.getServiceName(), addr);
@@ -120,23 +120,23 @@ public class ClusterServiceLocator implements ServiceDiscovery {
                     }
                 }
             }
-            
+
             // could not obtain service address, so sleep a bit
             try {
-                logger.debug(String.format("Locating Cluster Service '%s' Attempt: %d of %d failed.  Trying again in %d seconds.", 
-                    serviceName, (i + 1), numAttemptsValue, secondsBetweenAttempts));
+                logger.debug(String.format("Locating Cluster Service '%s' Attempt: %d of %d failed.  Trying again in %d seconds.",
+                        serviceName, (i + 1), numAttemptsValue, secondsBetweenAttempts));
                 Thread.sleep(secondsBetweenAttempts * 1000);
-            } catch(final InterruptedException ie) {
+            } catch (final InterruptedException ie) {
                 break;
             }
-            
+
         }
 
         return null;
     }
 
     public boolean isRunning() {
-        if(serviceDiscovery != null) {
+        if (serviceDiscovery != null) {
             return serviceDiscovery.isRunning();
         } else {
             return running.get();
@@ -144,31 +144,31 @@ public class ClusterServiceLocator implements ServiceDiscovery {
     }
 
     public void start() throws IOException {
-        
-        if(isRunning()) {
+
+        if (isRunning()) {
             throw new IllegalStateException("Instance is already started.");
         }
-        
-        if(serviceDiscovery != null) {
+
+        if (serviceDiscovery != null) {
             serviceDiscovery.start();
         }
         running.set(true);
     }
 
     public void stop() throws IOException {
-        
-        if(isRunning() == false) {
+
+        if (isRunning() == false) {
             throw new IllegalStateException("Instance is already stopped.");
         }
-        
-        if(serviceDiscovery != null) {
+
+        if (serviceDiscovery != null) {
             serviceDiscovery.stop();
         }
         running.set(false);
     }
-    
+
     public synchronized void setAttemptsConfig(final AttemptsConfig config) {
-        if(config == null) {
+        if (config == null) {
             throw new IllegalArgumentException("Attempts configuration may not be null.");
         }
         this.attemptsConfig.numAttempts = config.numAttempts;
@@ -183,21 +183,21 @@ public class ClusterServiceLocator implements ServiceDiscovery {
         config.timeBetweenAttempsUnit = this.attemptsConfig.timeBetweenAttempsUnit;
         return config;
     }
-    
+
     public static class AttemptsConfig {
-        
+
         private int numAttempts = 1;
-        
+
         private int timeBetweenAttempts = 1;
-        
+
         private TimeUnit timeBetweenAttempsUnit = TimeUnit.SECONDS;
-        
+
         public int getNumAttempts() {
             return numAttempts;
         }
 
         public void setNumAttempts(int numAttempts) {
-            if(numAttempts <= 0) {
+            if (numAttempts <= 0) {
                 throw new IllegalArgumentException("Number of attempts must be positive: " + numAttempts);
             }
             this.numAttempts = numAttempts;
@@ -208,9 +208,9 @@ public class ClusterServiceLocator implements ServiceDiscovery {
         }
 
         public void setTimeBetweenAttempsUnit(TimeUnit timeBetweenAttempsUnit) {
-            if(timeBetweenAttempts <= 0) {
+            if (timeBetweenAttempts <= 0) {
                 throw new IllegalArgumentException("Time between attempts must be positive: " + numAttempts);
-            } 
+            }
             this.timeBetweenAttempsUnit = timeBetweenAttempsUnit;
         }
 
@@ -219,9 +219,9 @@ public class ClusterServiceLocator implements ServiceDiscovery {
         }
 
         public void setTimeBetweenAttempts(int timeBetweenAttempts) {
-            if(timeBetweenAttempts <= 0) {
-            throw new IllegalArgumentException("Time between attempts must be positive: " + numAttempts);
-        } 
+            if (timeBetweenAttempts <= 0) {
+                throw new IllegalArgumentException("Time between attempts must be positive: " + numAttempts);
+            }
             this.timeBetweenAttempts = timeBetweenAttempts;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java
index e9e7d5b..3458760 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java
@@ -21,7 +21,10 @@ import java.io.IOException;
 import java.net.DatagramPacket;
 import java.net.InetSocketAddress;
 import java.net.MulticastSocket;
-import java.util.*;
+import java.util.Collections;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 
@@ -39,75 +42,76 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Broadcasts services used by the clustering software using multicast communication.
- * A configurable delay occurs after broadcasting the collection of services.
- * 
+ * Broadcasts services used by the clustering software using multicast
+ * communication. A configurable delay occurs after broadcasting the collection
+ * of services.
+ *
  * The client caller is responsible for starting and stopping the broadcasting.
  * The instance must be stopped before termination of the JVM to ensure proper
  * resource clean-up.
- * 
+ *
  * @author unattributed
  */
 public class ClusterServicesBroadcaster implements MulticastServicesBroadcaster {
-    
+
     private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(ClusterServicesBroadcaster.class));
-    
+
     private final Set<DiscoverableService> services = new CopyOnWriteArraySet<>();
 
     private final InetSocketAddress multicastAddress;
-    
+
     private final MulticastConfiguration multicastConfiguration;
-    
+
     private final ProtocolContext<ProtocolMessage> protocolContext;
-    
+
     private final int broadcastDelayMs;
-    
+
     private Timer broadcaster;
-    
+
     private MulticastSocket multicastSocket;
-    
-    public ClusterServicesBroadcaster(final InetSocketAddress multicastAddress, 
-            final MulticastConfiguration multicastConfiguration, 
+
+    public ClusterServicesBroadcaster(final InetSocketAddress multicastAddress,
+            final MulticastConfiguration multicastConfiguration,
             final ProtocolContext<ProtocolMessage> protocolContext, final String broadcastDelay) {
-        
-        if(multicastAddress == null) {
+
+        if (multicastAddress == null) {
             throw new IllegalArgumentException("Multicast address may not be null.");
-        } else if(multicastAddress.getAddress().isMulticastAddress() == false) {
+        } else if (multicastAddress.getAddress().isMulticastAddress() == false) {
             throw new IllegalArgumentException("Multicast group address is not a Class D IP address.");
-        } else if(protocolContext == null) {
+        } else if (protocolContext == null) {
             throw new IllegalArgumentException("Protocol Context may not be null.");
-        } else if(multicastConfiguration == null) {
+        } else if (multicastConfiguration == null) {
             throw new IllegalArgumentException("Multicast configuration may not be null.");
         }
-        
+
         this.services.addAll(services);
         this.multicastAddress = multicastAddress;
         this.multicastConfiguration = multicastConfiguration;
         this.protocolContext = protocolContext;
         this.broadcastDelayMs = (int) FormatUtils.getTimeDuration(broadcastDelay, TimeUnit.MILLISECONDS);
     }
-    
+
     public void start() throws IOException {
 
-        if(isRunning()) {
+        if (isRunning()) {
             throw new IllegalStateException("Instance is already started.");
         }
-        
+
         // setup socket
         multicastSocket = MulticastUtils.createMulticastSocket(multicastConfiguration);
-        
+
         // setup broadcaster
         broadcaster = new Timer("Cluster Services Broadcaster", /* is daemon */ true);
         broadcaster.schedule(new TimerTask() {
             @Override
             public void run() {
-                for(final DiscoverableService service : services) {
+                for (final DiscoverableService service : services) {
                     try {
 
                         final InetSocketAddress serviceAddress = service.getServiceAddress();
-                        logger.debug(String.format("Broadcasting Cluster Service '%s' at address %s:%d", 
-                            service.getServiceName(), serviceAddress.getHostName(), serviceAddress.getPort()));
-                        
+                        logger.debug(String.format("Broadcasting Cluster Service '%s' at address %s:%d",
+                                service.getServiceName(), serviceAddress.getHostName(), serviceAddress.getPort()));
+
                         // create message
                         final ServiceBroadcastMessage msg = new ServiceBroadcastMessage();
                         msg.setServiceName(service.getServiceName());
@@ -124,37 +128,37 @@ public class ClusterServicesBroadcaster implements MulticastServicesBroadcaster
                         final DatagramPacket packet = new DatagramPacket(packetBytes, packetBytes.length, multicastAddress);
                         multicastSocket.send(packet);
 
-                    } catch(final Exception ex) {
+                    } catch (final Exception ex) {
                         logger.warn(String.format("Cluster Services Broadcaster failed broadcasting service '%s' due to: %s", service.getServiceName(), ex), ex);
                     }
                 }
             }
         }, 0, broadcastDelayMs);
     }
-    
+
     public boolean isRunning() {
         return (broadcaster != null);
     }
-    
+
     public void stop() {
-        
-        if(isRunning() == false) {
+
+        if (isRunning() == false) {
             throw new IllegalStateException("Instance is already stopped.");
         }
-        
+
         broadcaster.cancel();
         broadcaster = null;
 
         // close socket
         MulticastUtils.closeQuietly(multicastSocket);
-        
+
     }
 
     @Override
     public int getBroadcastDelayMs() {
         return broadcastDelayMs;
     }
-    
+
     @Override
     public Set<DiscoverableService> getServices() {
         return Collections.unmodifiableSet(services);
@@ -164,16 +168,16 @@ public class ClusterServicesBroadcaster implements MulticastServicesBroadcaster
     public InetSocketAddress getMulticastAddress() {
         return multicastAddress;
     }
-    
+
     @Override
     public boolean addService(final DiscoverableService service) {
         return services.add(service);
     }
-    
+
     @Override
     public boolean removeService(final String serviceName) {
-        for(final DiscoverableService service : services) {
-            if(service.getServiceName().equals(serviceName)) {
+        for (final DiscoverableService service : services) {
+            if (service.getServiceName().equals(serviceName)) {
                 return services.remove(service);
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java
index 680df65..7ac17ab 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 
 public class CopyingInputStream extends FilterInputStream {
+
     private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
     private final int maxBytesToCopy;
     private final InputStream in;
@@ -32,45 +33,45 @@ public class CopyingInputStream extends FilterInputStream {
         this.maxBytesToCopy = maxBytesToCopy;
         this.in = in;
     }
-    
+
     @Override
     public int read() throws IOException {
         final int delegateRead = in.read();
-        if ( delegateRead != -1 && getNumberOfBytesCopied() < maxBytesToCopy ) {
+        if (delegateRead != -1 && getNumberOfBytesCopied() < maxBytesToCopy) {
             baos.write(delegateRead);
         }
-        
+
         return delegateRead;
     }
-    
+
     @Override
     public int read(byte[] b) throws IOException {
         final int delegateRead = in.read(b);
-        if ( delegateRead >= 0 ) {
+        if (delegateRead >= 0) {
             baos.write(b, 0, Math.min(delegateRead, maxBytesToCopy - getNumberOfBytesCopied()));
         }
-        
+
         return delegateRead;
     }
-    
+
     @Override
     public int read(byte[] b, int off, int len) throws IOException {
         final int delegateRead = in.read(b, off, len);
-        if ( delegateRead >= 0 ) {
+        if (delegateRead >= 0) {
             baos.write(b, off, Math.min(delegateRead, maxBytesToCopy - getNumberOfBytesCopied()));
         }
-        
+
         return delegateRead;
     }
-    
+
     public byte[] getBytesRead() {
         return baos.toByteArray();
     }
-    
+
     public void writeBytes(final OutputStream out) throws IOException {
         baos.writeTo(out);
     }
-    
+
     public int getNumberOfBytesCopied() {
         return baos.size();
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java
index d3764b3..90f9124 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java
@@ -45,20 +45,20 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Implements a listener for protocol messages sent over multicast.  If a message
+ * Implements a listener for protocol messages sent over multicast. If a message
  * is of type MulticastProtocolMessage, then the underlying protocol message is
- * passed to the handler.  If the receiving handler produces a message response,
- * then the message is wrapped with a MulticastProtocolMessage before being 
- * sent to the originator.
- * 
- * The client caller is responsible for starting and stopping the listener.
- * The instance must be stopped before termination of the JVM to ensure proper
+ * passed to the handler. If the receiving handler produces a message response,
+ * then the message is wrapped with a MulticastProtocolMessage before being sent
+ * to the originator.
+ *
+ * The client caller is responsible for starting and stopping the listener. The
+ * instance must be stopped before termination of the JVM to ensure proper
  * resource clean-up.
- * 
+ *
  * @author unattributed
  */
 public class MulticastProtocolListener extends MulticastListener implements ProtocolListener {
-    
+
     private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(MulticastProtocolListener.class));
 
     // immutable members
@@ -74,7 +74,7 @@ public class MulticastProtocolListener extends MulticastListener implements Prot
             final ProtocolContext<ProtocolMessage> protocolContext) {
 
         super(numThreads, multicastAddress, configuration);
-        
+
         if (protocolContext == null) {
             throw new IllegalArgumentException("Protocol Context may not be null.");
         }
@@ -89,21 +89,21 @@ public class MulticastProtocolListener extends MulticastListener implements Prot
     @Override
     public void start() throws IOException {
 
-        if(super.isRunning()) {
+        if (super.isRunning()) {
             throw new IllegalStateException("Instance is already started.");
         }
-        
+
         super.start();
-        
+
     }
 
     @Override
     public void stop() throws IOException {
 
-        if(super.isRunning() == false) {
+        if (super.isRunning() == false) {
             throw new IllegalStateException("Instance is already stopped.");
         }
-        
+
         // shutdown listener
         super.stop();
 
@@ -116,17 +116,17 @@ public class MulticastProtocolListener extends MulticastListener implements Prot
 
     @Override
     public void addHandler(final ProtocolHandler handler) {
-        if(handler == null) {
+        if (handler == null) {
             throw new NullPointerException("Protocol handler may not be null.");
         }
         handlers.add(handler);
     }
-    
+
     @Override
     public boolean removeHandler(final ProtocolHandler handler) {
         return handlers.remove(handler);
     }
-    
+
     @Override
     public void dispatchRequest(final MulticastSocket multicastSocket, final DatagramPacket packet) {
 
@@ -138,10 +138,10 @@ public class MulticastProtocolListener extends MulticastListener implements Prot
 
             // unwrap multicast message, if necessary
             final ProtocolMessage unwrappedRequest;
-            if(request instanceof MulticastProtocolMessage) {
+            if (request instanceof MulticastProtocolMessage) {
                 final MulticastProtocolMessage multicastRequest = (MulticastProtocolMessage) request;
                 // don't process a message we sent
-                if(listenerId.equals(multicastRequest.getId())) {
+                if (listenerId.equals(multicastRequest.getId())) {
                     return;
                 } else {
                     unwrappedRequest = multicastRequest.getProtocolMessage();
@@ -149,7 +149,7 @@ public class MulticastProtocolListener extends MulticastListener implements Prot
             } else {
                 unwrappedRequest = request;
             }
-            
+
             // dispatch message to handler
             ProtocolHandler desiredHandler = null;
             for (final ProtocolHandler handler : getHandlers()) {
@@ -164,28 +164,28 @@ public class MulticastProtocolListener extends MulticastListener implements Prot
                 throw new ProtocolException("No handler assigned to handle message type: " + request.getType());
             } else {
                 final ProtocolMessage response = desiredHandler.handle(request);
-                if(response != null) {
+                if (response != null) {
                     try {
-                        
+
                         // wrap with listener id
                         final MulticastProtocolMessage multicastResponse = new MulticastProtocolMessage(listenerId, response);
-                        
+
                         // marshal message
                         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
                         final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
                         marshaller.marshal(multicastResponse, baos);
                         final byte[] responseBytes = baos.toByteArray();
-                        
+
                         final int maxPacketSizeBytes = getMaxPacketSizeBytes();
-                        if(responseBytes.length > maxPacketSizeBytes) {
-                            logger.warn("Cluster protocol handler '" + desiredHandler.getClass() + 
-                                "' produced a multicast response with length greater than configured max packet size '" + maxPacketSizeBytes + "'");
+                        if (responseBytes.length > maxPacketSizeBytes) {
+                            logger.warn("Cluster protocol handler '" + desiredHandler.getClass()
+                                    + "' produced a multicast response with length greater than configured max packet size '" + maxPacketSizeBytes + "'");
                         }
-                        
+
                         // create and send packet
-                        final DatagramPacket responseDatagram = new DatagramPacket(responseBytes, responseBytes.length, getMulticastAddress().getAddress(), getMulticastAddress().getPort()); 
+                        final DatagramPacket responseDatagram = new DatagramPacket(responseBytes, responseBytes.length, getMulticastAddress().getAddress(), getMulticastAddress().getPort());
                         multicastSocket.send(responseDatagram);
-                        
+
                     } catch (final IOException ioe) {
                         throw new ProtocolException("Failed marshalling protocol message in response to message type: " + request.getType() + " due to: " + ioe, ioe);
                     }
@@ -194,8 +194,8 @@ public class MulticastProtocolListener extends MulticastListener implements Prot
 
         } catch (final Throwable t) {
             logger.warn("Failed processing protocol message due to " + t, t);
-            
-            if ( bulletinRepository != null ) {
+
+            if (bulletinRepository != null) {
                 final Bulletin bulletin = BulletinFactory.createBulletin("Clustering", "WARNING", "Failed to process Protocol Message due to " + t.toString());
                 bulletinRepository.addBulletin(bulletin);
             }


Mime
View raw message