http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
index 7a5ff2b..4e06926 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
@@ -21,13 +21,12 @@ import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import org.apache.nifi.cluster.protocol.jaxb.message.ConnectionResponseAdapter;
/**
- * The cluster manager's response to a node's connection request. If the manager
- * has a current copy of the data flow, then it is returned with a node identifier
- * to the node. Otherwise, the manager will provide a "try again in X seconds"
- * response to the node in hopes that a current data flow will be available upon
- * subsequent requests.
- *
- * @author unattributed
+ * The cluster manager's response to a node's connection request. If the manager
+ * has a current copy of the data flow, then it is returned with a node
+ * identifier to the node. Otherwise, the manager will provide a "try again in X
+ * seconds" response to the node in hopes that a current data flow will be
+ * available upon subsequent requests.
+ *
*/
@XmlJavaTypeAdapter(ConnectionResponseAdapter.class)
public class ConnectionResponse {
@@ -40,14 +39,14 @@ public class ConnectionResponse {
private final Integer managerRemoteInputPort;
private final Boolean managerRemoteCommsSecure;
private final String instanceId;
-
+
private volatile String clusterManagerDN;
-
- public ConnectionResponse(final NodeIdentifier nodeIdentifier, final StandardDataFlow dataFlow, final boolean primary,
- final Integer managerRemoteInputPort, final Boolean managerRemoteCommsSecure, final String instanceId) {
- if(nodeIdentifier == null) {
+
+ public ConnectionResponse(final NodeIdentifier nodeIdentifier, final StandardDataFlow dataFlow, final boolean primary,
+ final Integer managerRemoteInputPort, final Boolean managerRemoteCommsSecure, final String instanceId) {
+ if (nodeIdentifier == null) {
throw new IllegalArgumentException("Node identifier may not be empty or null.");
- } else if(dataFlow == null) {
+ } else if (dataFlow == null) {
throw new IllegalArgumentException("DataFlow may not be null.");
}
this.nodeIdentifier = nodeIdentifier;
@@ -59,9 +58,9 @@ public class ConnectionResponse {
this.managerRemoteCommsSecure = managerRemoteCommsSecure;
this.instanceId = instanceId;
}
-
+
public ConnectionResponse(final int tryLaterSeconds) {
- if(tryLaterSeconds <= 0) {
+ if (tryLaterSeconds <= 0) {
throw new IllegalArgumentException("Try-Later seconds may not be nonnegative: " + tryLaterSeconds);
}
this.dataFlow = null;
@@ -84,19 +83,19 @@ public class ConnectionResponse {
this.managerRemoteCommsSecure = null;
this.instanceId = null;
}
-
+
public static ConnectionResponse createBlockedByFirewallResponse() {
return new ConnectionResponse();
}
-
+
public boolean isPrimary() {
return primary;
}
-
+
public boolean shouldTryLater() {
return tryLaterSeconds > 0;
}
-
+
public boolean isBlockedByFirewall() {
return blockedByFirewall;
}
@@ -104,11 +103,11 @@ public class ConnectionResponse {
public int getTryLaterSeconds() {
return tryLaterSeconds;
}
-
+
public StandardDataFlow getDataFlow() {
return dataFlow;
}
-
+
public NodeIdentifier getNodeIdentifier() {
return nodeIdentifier;
}
@@ -116,23 +115,22 @@ public class ConnectionResponse {
public Integer getManagerRemoteInputPort() {
return managerRemoteInputPort;
}
-
+
public Boolean isManagerRemoteCommsSecure() {
return managerRemoteCommsSecure;
}
-
+
public String getInstanceId() {
return instanceId;
}
-
+
public void setClusterManagerDN(final String dn) {
this.clusterManagerDN = dn;
}
-
+
/**
- * Returns the DN of the NCM, if it is available or <code>null</code> otherwise.
- *
- * @return
+ * @return the DN of the NCM, if it is available or <code>null</code>
+ * otherwise
*/
public String getClusterManagerDN() {
return clusterManagerDN;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
index 67324a1..04fb3f0 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
@@ -23,44 +23,45 @@ import org.apache.nifi.cluster.protocol.jaxb.message.HeartbeatAdapter;
/**
* A heartbeat for indicating the status of a node to the cluster.
+ *
* @author unattributed
*/
@XmlJavaTypeAdapter(HeartbeatAdapter.class)
public class Heartbeat {
-
+
private final NodeIdentifier nodeIdentifier;
private final boolean primary;
private final boolean connected;
private final long createdTimestamp;
private final byte[] payload;
-
+
public Heartbeat(final NodeIdentifier nodeIdentifier, final boolean primary, final boolean connected, final byte[] payload) {
- if(nodeIdentifier == null) {
+ if (nodeIdentifier == null) {
throw new IllegalArgumentException("Node Identifier may not be null.");
- }
+ }
this.nodeIdentifier = nodeIdentifier;
this.primary = primary;
this.connected = connected;
this.payload = payload;
this.createdTimestamp = new Date().getTime();
}
-
+
public NodeIdentifier getNodeIdentifier() {
return nodeIdentifier;
}
-
+
public byte[] getPayload() {
return payload;
}
-
+
public boolean isPrimary() {
return primary;
}
-
+
public boolean isConnected() {
return connected;
}
-
+
@XmlTransient
public long getCreatedTimestamp() {
return createdTimestamp;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java
index a120524..86df107 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java
@@ -40,5 +40,5 @@ public class NodeBulletins {
public byte[] getPayload() {
return payload;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
index 1893186..4b10be6 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
@@ -19,56 +19,68 @@ package org.apache.nifi.cluster.protocol;
import org.apache.commons.lang3.StringUtils;
/**
- * A node identifier denoting the coordinates of a flow controller that is connected
- * to a cluster. Nodes provide an external public API interface and an internal private
- * interface for communicating with the cluster.
- *
- * The external API interface and internal protocol each require an IP or hostname
- * as well as a port for communicating.
- *
+ * A node identifier denoting the coordinates of a flow controller that is
+ * connected to a cluster. Nodes provide an external public API interface and an
+ * internal private interface for communicating with the cluster.
+ *
+ * The external API interface and internal protocol each require an IP or
+ * hostname as well as a port for communicating.
+ *
* This class overrides hashCode and equals and considers two instances to be
* equal if they have the equal IDs.
- *
+ *
* @author unattributed
* @Immutable
* @Threadsafe
*/
public class NodeIdentifier {
-
- /** the unique identifier for the node */
+
+ /**
+ * the unique identifier for the node
+ */
private final String id;
-
- /** the IP or hostname to use for sending requests to the node's external interface */
+
+ /**
+ * the IP or hostname to use for sending requests to the node's external
+ * interface
+ */
private final String apiAddress;
-
- /** the port to use use for sending requests to the node's external interface */
- private final int apiPort;
-
- /** the IP or hostname to use for sending requests to the node's internal interface */
+
+ /**
+ * the port to use use for sending requests to the node's external interface
+ */
+ private final int apiPort;
+
+ /**
+ * the IP or hostname to use for sending requests to the node's internal
+ * interface
+ */
private final String socketAddress;
-
- /** the port to use use for sending requests to the node's internal interface */
+
+ /**
+ * the port to use use for sending requests to the node's internal interface
+ */
private final int socketPort;
-
+
private final String nodeDn;
public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort) {
this(id, apiAddress, apiPort, socketAddress, socketPort, null);
}
-
+
public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort, final String dn) {
-
- if(StringUtils.isBlank(id)) {
+
+ if (StringUtils.isBlank(id)) {
throw new IllegalArgumentException("Node ID may not be empty or null.");
- } else if(StringUtils.isBlank(apiAddress)) {
+ } else if (StringUtils.isBlank(apiAddress)) {
throw new IllegalArgumentException("Node API address may not be empty or null.");
- } else if(StringUtils.isBlank(socketAddress)) {
+ } else if (StringUtils.isBlank(socketAddress)) {
throw new IllegalArgumentException("Node socket address may not be empty or null.");
- }
-
+ }
+
validatePort(apiPort);
validatePort(socketPort);
-
+
this.id = id;
this.apiAddress = apiAddress;
this.apiPort = apiPort;
@@ -80,11 +92,11 @@ public class NodeIdentifier {
public String getId() {
return id;
}
-
+
public String getDN() {
return nodeDn;
}
-
+
public String getApiAddress() {
return apiAddress;
}
@@ -96,22 +108,22 @@ public class NodeIdentifier {
public String getSocketAddress() {
return socketAddress;
}
-
+
public int getSocketPort() {
return socketPort;
}
-
+
private void validatePort(final int port) {
- if(port < 1 || port > 65535) {
+ if (port < 1 || port > 65535) {
throw new IllegalArgumentException("Port must be inclusively in the range [1, 65535]. Port given: " + port);
- }
+ }
}
-
+
/**
* Compares the id of two node identifiers for equality.
- *
+ *
* @param obj a node identifier
- *
+ *
* @return true if the id is equal; false otherwise
*/
@Override
@@ -130,33 +142,33 @@ public class NodeIdentifier {
}
/**
- * Compares API address/port and socket address/port for equality. The
- * id is not used for comparison.
- *
+ * Compares API address/port and socket address/port for equality. The id is
+ * not used for comparison.
+ *
* @param other a node identifier
- *
+ *
* @return true if API address/port and socket address/port are equal; false
* otherwise
*/
public boolean logicallyEquals(final NodeIdentifier other) {
- if(other == null) {
+ if (other == null) {
return false;
}
if ((this.apiAddress == null) ? (other.apiAddress != null) : !this.apiAddress.equals(other.apiAddress)) {
return false;
}
- if(this.apiPort != other.apiPort) {
+ if (this.apiPort != other.apiPort) {
return false;
}
if ((this.socketAddress == null) ? (other.socketAddress != null) : !this.socketAddress.equals(other.socketAddress)) {
return false;
}
- if(this.socketPort != other.socketPort) {
+ if (this.socketPort != other.socketPort) {
return false;
}
return true;
}
-
+
@Override
public int hashCode() {
int hash = 7;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
index 1edcb91..f3e5df4 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
@@ -24,50 +24,61 @@ import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
/**
- * An interface for sending protocol messages from a node to the cluster manager.
- * @author unattributed
+ * An interface for sending protocol messages from a node to the cluster
+ * manager.
+ *
*/
public interface NodeProtocolSender {
-
+
/**
* Sends a "connection request" message to the cluster manager.
+ *
* @param msg a message
* @return the response
- * @throws UnknownServiceAddressException if the cluster manager's address is not known
+ * @throws UnknownServiceAddressException if the cluster manager's address
+ * is not known
* @throws ProtocolException if communication failed
*/
ConnectionResponseMessage requestConnection(ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException;
-
+
/**
* Sends a "heartbeat" message to the cluster manager.
+ *
* @param msg a message
- * @throws UnknownServiceAddressException if the cluster manager's address is not known
+ * @throws UnknownServiceAddressException if the cluster manager's address
+ * is not known
* @throws ProtocolException if communication failed
*/
void heartbeat(HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException;
-
+
/**
* Sends a bulletins message to the cluster manager.
- * @param msg
- * @throws ProtocolException
- * @throws UnknownServiceAddressException
+ *
+ * @param msg a message
+ * @throws ProtocolException pe
+ * @throws UnknownServiceAddressException ex
*/
void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException;
-
+
/**
* Sends a failure notification if the controller was unable start.
+ *
* @param msg a message
- * @throws UnknownServiceAddressException if the cluster manager's address is not known
+ * @throws UnknownServiceAddressException if the cluster manager's address
+ * is not known
* @throws ProtocolException if communication failed
*/
void notifyControllerStartupFailure(ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException;
-
+
/**
- * Sends a failure notification if the node was unable to reconnect to the cluster
+ * Sends a failure notification if the node was unable to reconnect to the
+ * cluster
+ *
* @param msg a message
- * @throws UnknownServiceAddressException if the cluster manager's address is not known
+ * @throws UnknownServiceAddressException if the cluster manager's address
+ * is not known
* @throws ProtocolException if communication failed
*/
void notifyReconnectionFailure(ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException;
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java
index b614e76..11a3912 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java
@@ -17,22 +17,24 @@
package org.apache.nifi.cluster.protocol;
/**
- * The context for communicating using the internal cluster protocol.
- *
+ * The context for communicating using the internal cluster protocol.
+ *
* @param <T> The type of protocol message.
- *
+ *
* @author unattributed
*/
public interface ProtocolContext<T> {
-
+
/**
* Creates a marshaller for serializing protocol messages.
+ *
* @return a marshaller
*/
ProtocolMessageMarshaller<T> createMarshaller();
-
+
/**
* Creates an unmarshaller for deserializing protocol messages.
+ *
* @return a unmarshaller
*/
ProtocolMessageUnmarshaller<T> createUnmarshaller();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java
index f11ad84..b6c3737 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java
@@ -19,21 +19,22 @@ package org.apache.nifi.cluster.protocol;
/**
* The base exception for problems encountered while communicating within the
* cluster.
+ *
* @author unattributed
*/
public class ProtocolException extends RuntimeException {
-
+
public ProtocolException() {
}
-
+
public ProtocolException(String msg) {
super(msg);
}
-
+
public ProtocolException(Throwable cause) {
super(cause);
}
-
+
public ProtocolException(String msg, Throwable cause) {
super(msg, cause);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java
index 6de87db..b2bace9 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java
@@ -20,25 +20,26 @@ import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
/**
* A handler for processing protocol messages.
- * @author unattributed
+ *
*/
public interface ProtocolHandler {
-
+
/**
* Handles the given protocol message or throws an exception if it cannot
- * handle the message. If no response is needed by the protocol, then null
+ * handle the message. If no response is needed by the protocol, then null
* should be returned.
- *
+ *
* @param msg a message
* @return a response or null, if no response is necessary
- *
+ *
* @throws ProtocolException if the message could not be processed
*/
ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException;
-
+
/**
- * @param msg
- * @return true if the handler can process the given message; false otherwise
+ * @param msg a message
+ * @return true if the handler can process the given message; false
+ * otherwise
*/
boolean canHandle(ProtocolMessage msg);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java
index 32f0f5d..2f35241 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java
@@ -23,48 +23,53 @@ import org.apache.nifi.reporting.BulletinRepository;
/**
* Defines the interface for a listener to process protocol messages.
- * @author unattributed
+ *
*/
public interface ProtocolListener {
-
+
/**
- * Starts the instance for listening for messages. Start may only be called
+ * Starts the instance for listening for messages. Start may only be called
* if the instance is not running.
- * @throws java.io.IOException
+ *
+ * @throws java.io.IOException ex
*/
void start() throws IOException;
-
+
/**
- * Stops the instance from listening for messages. Stop may only be called
+ * Stops the instance from listening for messages. Stop may only be called
* if the instance is running.
- * @throws java.io.IOException
+ *
+ * @throws java.io.IOException ex
*/
void stop() throws IOException;
-
+
/**
* @return true if the instance is started; false otherwise.
*/
boolean isRunning();
-
+
/**
* @return the handlers registered with the listener
*/
Collection<ProtocolHandler> getHandlers();
-
+
/**
* Registers a handler with the listener.
+ *
* @param handler a handler
*/
void addHandler(ProtocolHandler handler);
-
+
/**
* Sets the BulletinRepository that can be used to report bulletins
- * @param bulletinRepository
+ *
+ * @param bulletinRepository repo
*/
void setBulletinRepository(BulletinRepository bulletinRepository);
-
+
/**
* Unregisters the handler with the listener.
+ *
* @param handler a handler
* @return true if the handler was removed; false otherwise
*/
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java
index bb436e0..4e43d4d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java
@@ -21,15 +21,16 @@ import java.io.OutputStream;
/**
* Defines a marshaller for serializing protocol messages.
- *
+ *
* @param <T> The type of protocol message.
- *
+ *
* @author unattributed
*/
public interface ProtocolMessageMarshaller<T> {
-
+
/**
* Serializes the given message to the given output stream.
+ *
* @param msg a message
* @param os an output stream
* @throws IOException if the message could not be serialized to the stream
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java
index c690e7b..e8910bd 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java
@@ -21,18 +21,19 @@ import java.io.InputStream;
/**
* Defines an unmarshaller for deserializing protocol messages.
- *
+ *
* @param <T> The type of protocol message.
- *
- * @author unattributed
+ *
*/
public interface ProtocolMessageUnmarshaller<T> {
-
+
/**
* Deserializes a message on the given input stream.
+ *
* @param is an input stream
- * @return
- * @throws IOException if the message could not be deserialized from the stream
+ * @return deserialized message
+ * @throws IOException if the message could not be deserialized from the
+ * stream
*/
T unmarshal(InputStream is) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
index c2d16fc..0f0ed69 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
@@ -25,25 +25,25 @@ import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import org.apache.nifi.cluster.protocol.jaxb.message.DataFlowAdapter;
/**
- * Represents a dataflow, which includes the raw bytes of the flow.xml and
+ * Represents a dataflow, which includes the raw bytes of the flow.xml and
* whether processors should be started automatically at application startup.
*/
@XmlJavaTypeAdapter(DataFlowAdapter.class)
public class StandardDataFlow implements Serializable, DataFlow {
-
+
private final byte[] flow;
private final byte[] templateBytes;
private final byte[] snippetBytes;
private boolean autoStartProcessors;
-
+
/**
- * Constructs an instance.
- *
+ * Constructs an instance.
+ *
* @param flow a valid flow as bytes, which cannot be null
* @param templateBytes an XML representation of templates
* @param snippetBytes an XML representation of snippets
- *
+ *
* @throws NullPointerException if any argument is null
*/
public StandardDataFlow(final byte[] flow, final byte[] templateBytes, final byte[] snippetBytes) {
@@ -51,20 +51,20 @@ public class StandardDataFlow implements Serializable, DataFlow {
this.templateBytes = templateBytes;
this.snippetBytes = snippetBytes;
}
-
+
public StandardDataFlow(final DataFlow toCopy) {
this.flow = copy(toCopy.getFlow());
this.templateBytes = copy(toCopy.getTemplates());
this.snippetBytes = copy(toCopy.getSnippets());
this.autoStartProcessors = toCopy.isAutoStartProcessors();
}
-
+
private static byte[] copy(final byte[] bytes) {
return bytes == null ? null : Arrays.copyOf(bytes, bytes.length);
}
-
+
/**
- * @return the raw byte array of the flow
+ * @return the raw byte array of the flow
*/
public byte[] getFlow() {
return flow;
@@ -76,26 +76,26 @@ public class StandardDataFlow implements Serializable, DataFlow {
public byte[] getTemplates() {
return templateBytes;
}
-
+
/**
* @return the raw byte array of the snippets
*/
public byte[] getSnippets() {
return snippetBytes;
}
-
+
/**
- * @return true if processors should be automatically started at application
- * startup; false otherwise
+ * @return true if processors should be automatically started at application
+ * startup; false otherwise
*/
public boolean isAutoStartProcessors() {
return autoStartProcessors;
}
-
+
/**
- *
+ *
* Sets the flag to automatically start processors at application startup.
- *
+ *
* @param autoStartProcessors true if processors should be automatically
* started at application startup; false otherwise
*/
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java
index 41c74eb..dc22ba0 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java
@@ -18,21 +18,22 @@ package org.apache.nifi.cluster.protocol;
/**
* Represents the exceptional case when a service's address is not known.
+ *
* @author unattributed
*/
public class UnknownServiceAddressException extends RuntimeException {
-
+
public UnknownServiceAddressException() {
}
-
+
public UnknownServiceAddressException(String msg) {
super(msg);
}
-
+
public UnknownServiceAddressException(Throwable cause) {
super(cause);
}
-
+
public UnknownServiceAddressException(String msg, Throwable cause) {
super(msg, cause);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
index ceb3fcb..636a6d3 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
@@ -43,34 +43,32 @@ import org.apache.nifi.util.FormatUtils;
/**
* A protocol sender for sending protocol messages from the cluster manager to
- * nodes.
- *
- * Connection-type requests (e.g., reconnection, disconnection) by nature of
- * starting/stopping flow controllers take longer than other types of protocol
- * messages. Therefore, a handshake timeout may be specified to lengthen the
+ * nodes.
+ *
+ * Connection-type requests (e.g., reconnection, disconnection) by nature of
+ * starting/stopping flow controllers take longer than other types of protocol
+ * messages. Therefore, a handshake timeout may be specified to lengthen the
* allowable time for communication with the node.
- *
- * @author unattributed
+ *
*/
public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolSender {
-
private final ProtocolContext<ProtocolMessage> protocolContext;
private final SocketConfiguration socketConfiguration;
private int handshakeTimeoutSeconds;
private volatile BulletinRepository bulletinRepository;
public ClusterManagerProtocolSenderImpl(final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) {
- if(socketConfiguration == null) {
+ if (socketConfiguration == null) {
throw new IllegalArgumentException("Socket configuration may not be null.");
- } else if(protocolContext == null) {
+ } else if (protocolContext == null) {
throw new IllegalArgumentException("Protocol Context may not be null.");
}
this.socketConfiguration = socketConfiguration;
this.protocolContext = protocolContext;
this.handshakeTimeoutSeconds = -1; // less than zero denotes variable not configured
}
-
+
@Override
public void setBulletinRepository(final BulletinRepository bulletinRepository) {
this.bulletinRepository = bulletinRepository;
@@ -78,76 +76,79 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS
/**
* Requests the data flow from a node.
+ *
* @param msg a message
* @return the message response
- * @throws @throws ProtocolException if the message failed to be sent or the response was malformed
+ * @throws ProtocolException if the message failed to be sent or the
+ * response was malformed
*/
@Override
public FlowResponseMessage requestFlow(final FlowRequestMessage msg) throws ProtocolException {
Socket socket = null;
try {
- socket = createSocket(msg.getNodeId(), false);
-
+ socket = createSocket(msg.getNodeId(), false);
+
try {
// marshal message to output stream
final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
marshaller.marshal(msg, socket.getOutputStream());
- } catch(final IOException ioe) {
+ } catch (final IOException ioe) {
throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
}
-
+
final ProtocolMessage response;
try {
// unmarshall response and return
final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
response = unmarshaller.unmarshal(socket.getInputStream());
- } catch(final IOException ioe) {
+ } catch (final IOException ioe) {
throw new ProtocolException("Failed unmarshalling '" + MessageType.FLOW_RESPONSE + "' protocol message due to: " + ioe, ioe);
- }
-
- if(MessageType.FLOW_RESPONSE == response.getType()) {
+ }
+
+ if (MessageType.FLOW_RESPONSE == response.getType()) {
return (FlowResponseMessage) response;
} else {
throw new ProtocolException("Expected message type '" + MessageType.FLOW_RESPONSE + "' but found '" + response.getType() + "'");
}
-
+
} finally {
SocketUtils.closeQuietly(socket);
}
}
/**
- * Requests a node to reconnect to the cluster. The configured value for
+ * Requests a node to reconnect to the cluster. The configured value for
* handshake timeout is applied to the socket before making the request.
+ *
* @param msg a message
* @return the response
- * @throws ProtocolException if the message failed to be sent or the response was malformed
+ * @throws ProtocolException if the message failed to be sent or the
+ * response was malformed
*/
@Override
public ReconnectionResponseMessage requestReconnection(final ReconnectionRequestMessage msg) throws ProtocolException {
Socket socket = null;
try {
- socket = createSocket(msg.getNodeId(), true);
+ socket = createSocket(msg.getNodeId(), true);
// marshal message to output stream
try {
final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
marshaller.marshal(msg, socket.getOutputStream());
- } catch(final IOException ioe) {
+ } catch (final IOException ioe) {
throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
}
-
-
+
final ProtocolMessage response;
try {
// unmarshall response and return
final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
response = unmarshaller.unmarshal(socket.getInputStream());
- } catch(final IOException ioe) {
+ } catch (final IOException ioe) {
throw new ProtocolException("Failed unmarshalling '" + MessageType.RECONNECTION_RESPONSE + "' protocol message due to: " + ioe, ioe);
- }
-
- if(MessageType.RECONNECTION_RESPONSE == response.getType()) {
+ }
+
+ if (MessageType.RECONNECTION_RESPONSE == response.getType()) {
return (ReconnectionResponseMessage) response;
} else {
throw new ProtocolException("Expected message type '" + MessageType.FLOW_RESPONSE + "' but found '" + response.getType() + "'");
@@ -156,10 +157,11 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS
SocketUtils.closeQuietly(socket);
}
}
-
+
/**
- * Requests a node to disconnect from the cluster. The configured value for
+ * Requests a node to disconnect from the cluster. The configured value for
* handshake timeout is applied to the socket before making the request.
+ *
* @param msg a message
* @throws ProtocolException if the message failed to be sent
*/
@@ -167,13 +169,13 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS
public void disconnect(final DisconnectMessage msg) throws ProtocolException {
Socket socket = null;
try {
- socket = createSocket(msg.getNodeId(), true);
+ socket = createSocket(msg.getNodeId(), true);
// marshal message to output stream
try {
final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
marshaller.marshal(msg, socket.getOutputStream());
- } catch(final IOException ioe) {
+ } catch (final IOException ioe) {
throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
}
} finally {
@@ -183,37 +185,36 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS
/**
* Assigns the primary role to a node.
- *
+ *
* @param msg a message
- *
+ *
* @throws ProtocolException if the message failed to be sent
*/
@Override
public void assignPrimaryRole(final PrimaryRoleAssignmentMessage msg) throws ProtocolException {
Socket socket = null;
try {
- socket = createSocket(msg.getNodeId(), true);
+ socket = createSocket(msg.getNodeId(), true);
try {
// marshal message to output stream
final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
marshaller.marshal(msg, socket.getOutputStream());
- } catch(final IOException ioe) {
+ } catch (final IOException ioe) {
throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
}
} finally {
SocketUtils.closeQuietly(socket);
}
}
-
-
+
private void setConnectionHandshakeTimeoutOnSocket(final Socket socket) throws SocketException {
// update socket timeout, if handshake timeout was set; otherwise use socket's current timeout
- if(handshakeTimeoutSeconds >= 0) {
+ if (handshakeTimeoutSeconds >= 0) {
socket.setSoTimeout(handshakeTimeoutSeconds * 1000);
- }
+ }
}
-
+
public SocketConfiguration getSocketConfiguration() {
return socketConfiguration;
}
@@ -227,18 +228,18 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS
}
private Socket createSocket(final NodeIdentifier nodeId, final boolean applyHandshakeTimeout) {
- return createSocket(nodeId.getSocketAddress(), nodeId.getSocketPort(), applyHandshakeTimeout);
+ return createSocket(nodeId.getSocketAddress(), nodeId.getSocketPort(), applyHandshakeTimeout);
}
-
+
private Socket createSocket(final String host, final int port, final boolean applyHandshakeTimeout) {
- try {
+ try {
// create a socket
final Socket socket = SocketUtils.createSocket(InetSocketAddress.createUnresolved(host, port), socketConfiguration);
- if ( applyHandshakeTimeout ) {
- setConnectionHandshakeTimeoutOnSocket(socket);
+ if (applyHandshakeTimeout) {
+ setConnectionHandshakeTimeoutOnSocket(socket);
}
return socket;
- } catch(final IOException ioe) {
+ } catch (final IOException ioe) {
throw new ProtocolException("Failed to create socket due to: " + ioe, ioe);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
index 933e5fa..2837f1a 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
@@ -32,21 +32,21 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
import org.apache.nifi.reporting.BulletinRepository;
/**
- * A wrapper class for consolidating a protocol sender and listener for the cluster
- * manager.
- *
+ * A wrapper class for consolidating a protocol sender and listener for the
+ * cluster manager.
+ *
* @author unattributed
*/
public class ClusterManagerProtocolSenderListener implements ClusterManagerProtocolSender, ProtocolListener {
-
+
private final ClusterManagerProtocolSender sender;
-
+
private final ProtocolListener listener;
-
+
public ClusterManagerProtocolSenderListener(final ClusterManagerProtocolSender sender, final ProtocolListener listener) {
- if(sender == null) {
+ if (sender == null) {
throw new IllegalArgumentException("ClusterManagerProtocolSender may not be null.");
- } else if(listener == null) {
+ } else if (listener == null) {
throw new IllegalArgumentException("ProtocolListener may not be null.");
}
this.sender = sender;
@@ -55,7 +55,7 @@ public class ClusterManagerProtocolSenderListener implements ClusterManagerProto
@Override
public void stop() throws IOException {
- if(!isRunning()) {
+ if (!isRunning()) {
throw new IllegalStateException("Instance is already stopped.");
}
listener.stop();
@@ -63,7 +63,7 @@ public class ClusterManagerProtocolSenderListener implements ClusterManagerProto
@Override
public void start() throws IOException {
- if(isRunning()) {
+ if (isRunning()) {
throw new IllegalStateException("Instance is already started.");
}
listener.start();
@@ -88,13 +88,13 @@ public class ClusterManagerProtocolSenderListener implements ClusterManagerProto
public void addHandler(final ProtocolHandler handler) {
listener.addHandler(handler);
}
-
+
@Override
public void setBulletinRepository(final BulletinRepository bulletinRepository) {
listener.setBulletinRepository(bulletinRepository);
sender.setBulletinRepository(bulletinRepository);
}
-
+
@Override
public FlowResponseMessage requestFlow(final FlowRequestMessage msg) throws ProtocolException {
return sender.requestFlow(msg);
@@ -109,10 +109,10 @@ public class ClusterManagerProtocolSenderListener implements ClusterManagerProto
public void disconnect(DisconnectMessage msg) throws ProtocolException {
sender.disconnect(msg);
}
-
+
@Override
public void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException {
sender.assignPrimaryRole(msg);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java
index 24e51e0..f808c83 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
* discovery. The instance must be stopped before termination of the JVM to
* ensure proper resource clean-up.
*
- * @author unattributed
*/
public class ClusterServiceDiscovery implements MulticastServiceDiscovery, ProtocolListener {
@@ -60,7 +59,6 @@ public class ClusterServiceDiscovery implements MulticastServiceDiscovery, Proto
*/
private DiscoverableService service;
-
public ClusterServiceDiscovery(final String serviceName, final InetSocketAddress multicastAddress,
final MulticastConfiguration multicastConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) {
@@ -162,7 +160,8 @@ public class ClusterServiceDiscovery implements MulticastServiceDiscovery, Proto
|| broadcastMsg.getPort() != oldService.getServiceAddress().getPort()) {
service = new DiscoverableServiceImpl(serviceName, InetSocketAddress.createUnresolved(broadcastMsg.getAddress(), broadcastMsg.getPort()));
final InetSocketAddress oldServiceAddress = (oldService == null) ? null : oldService.getServiceAddress();
- logger.info(String.format("Updating cluster service address for '%s' from '%s' to '%s'", serviceName, prettyPrint(oldServiceAddress), prettyPrint(service.getServiceAddress())));
+ logger.info(String.format("Updating cluster service address for '%s' from '%s' to '%s'",
+ serviceName, prettyPrint(oldServiceAddress), prettyPrint(service.getServiceAddress())));
}
}
return null;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java
index bebfde8..64ca7fa 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java
@@ -27,39 +27,39 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Implements the ServiceLocator interface for locating the socket address
- * of a cluster service. Depending on configuration, the address may be located
- * using service discovery. If using service discovery, then the service methods
- * must be used for starting and stopping discovery.
- *
- * Service discovery may be used in conjunction with a fixed port. In this case,
- * the service discovery will yield the service IP/host while the fixed port will
- * be used for the port.
- *
+ * Implements the ServiceLocator interface for locating the socket address of a
+ * cluster service. Depending on configuration, the address may be located using
+ * service discovery. If using service discovery, then the service methods must
+ * be used for starting and stopping discovery.
+ *
+ * Service discovery may be used in conjunction with a fixed port. In this case,
+ * the service discovery will yield the service IP/host while the fixed port
+ * will be used for the port.
+ *
* Alternatively, the instance may be configured with exact service location, in
- * which case, no service discovery occurs and the caller will always receive the
- * configured service.
- *
+ * which case, no service discovery occurs and the caller will always receive
+ * the configured service.
+ *
* @author unattributed
*/
public class ClusterServiceLocator implements ServiceDiscovery {
-
+
private static final Logger logger = LoggerFactory.getLogger(ClusterServiceLocator.class);
-
+
private final String serviceName;
-
+
private final ClusterServiceDiscovery serviceDiscovery;
-
+
private final DiscoverableService fixedService;
private final int fixedServicePort;
-
+
private final AttemptsConfig attemptsConfig = new AttemptsConfig();
-
+
private final AtomicBoolean running = new AtomicBoolean(false);
-
+
public ClusterServiceLocator(final ClusterServiceDiscovery serviceDiscovery) {
- if(serviceDiscovery == null) {
+ if (serviceDiscovery == null) {
throw new IllegalArgumentException("ClusterServiceDiscovery may not be null.");
}
this.serviceDiscovery = serviceDiscovery;
@@ -67,9 +67,9 @@ public class ClusterServiceLocator implements ServiceDiscovery {
this.fixedServicePort = 0;
this.serviceName = serviceDiscovery.getServiceName();
}
-
+
public ClusterServiceLocator(final ClusterServiceDiscovery serviceDiscovery, final int fixedServicePort) {
- if(serviceDiscovery == null) {
+ if (serviceDiscovery == null) {
throw new IllegalArgumentException("ClusterServiceDiscovery may not be null.");
}
this.serviceDiscovery = serviceDiscovery;
@@ -77,9 +77,9 @@ public class ClusterServiceLocator implements ServiceDiscovery {
this.fixedServicePort = fixedServicePort;
this.serviceName = serviceDiscovery.getServiceName();
}
-
+
public ClusterServiceLocator(final DiscoverableService fixedService) {
- if(fixedService == null) {
+ if (fixedService == null) {
throw new IllegalArgumentException("Service may not be null.");
}
this.serviceDiscovery = null;
@@ -87,30 +87,30 @@ public class ClusterServiceLocator implements ServiceDiscovery {
this.fixedServicePort = 0;
this.serviceName = fixedService.getServiceName();
}
-
+
@Override
public DiscoverableService getService() {
-
+
final int numAttemptsValue;
final int secondsBetweenAttempts;
- synchronized(this) {
+ synchronized (this) {
numAttemptsValue = attemptsConfig.numAttempts;
secondsBetweenAttempts = attemptsConfig.getTimeBetweenAttempts();
}
-
+
// try for a configured amount of attempts to retrieve the service address
- for(int i = 0; i < numAttemptsValue; i++) {
+ for (int i = 0; i < numAttemptsValue; i++) {
- if(fixedService != null) {
+ if (fixedService != null) {
return fixedService;
- } else if(serviceDiscovery != null) {
-
+ } else if (serviceDiscovery != null) {
+
final DiscoverableService discoveredService = serviceDiscovery.getService();
-
+
// if we received an address
- if(discoveredService != null) {
+ if (discoveredService != null) {
// if we were configured with a fixed port, then use the discovered host and fixed port; otherwise use the discovered address
- if(fixedServicePort > 0) {
+ if (fixedServicePort > 0) {
// create service using discovered service name and address with fixed service port
final InetSocketAddress addr = InetSocketAddress.createUnresolved(discoveredService.getServiceAddress().getHostName(), fixedServicePort);
final DiscoverableService result = new DiscoverableServiceImpl(discoveredService.getServiceName(), addr);
@@ -120,23 +120,23 @@ public class ClusterServiceLocator implements ServiceDiscovery {
}
}
}
-
+
// could not obtain service address, so sleep a bit
try {
- logger.debug(String.format("Locating Cluster Service '%s' Attempt: %d of %d failed. Trying again in %d seconds.",
- serviceName, (i + 1), numAttemptsValue, secondsBetweenAttempts));
+ logger.debug(String.format("Locating Cluster Service '%s' Attempt: %d of %d failed. Trying again in %d seconds.",
+ serviceName, (i + 1), numAttemptsValue, secondsBetweenAttempts));
Thread.sleep(secondsBetweenAttempts * 1000);
- } catch(final InterruptedException ie) {
+ } catch (final InterruptedException ie) {
break;
}
-
+
}
return null;
}
public boolean isRunning() {
- if(serviceDiscovery != null) {
+ if (serviceDiscovery != null) {
return serviceDiscovery.isRunning();
} else {
return running.get();
@@ -144,31 +144,31 @@ public class ClusterServiceLocator implements ServiceDiscovery {
}
public void start() throws IOException {
-
- if(isRunning()) {
+
+ if (isRunning()) {
throw new IllegalStateException("Instance is already started.");
}
-
- if(serviceDiscovery != null) {
+
+ if (serviceDiscovery != null) {
serviceDiscovery.start();
}
running.set(true);
}
public void stop() throws IOException {
-
- if(isRunning() == false) {
+
+ if (isRunning() == false) {
throw new IllegalStateException("Instance is already stopped.");
}
-
- if(serviceDiscovery != null) {
+
+ if (serviceDiscovery != null) {
serviceDiscovery.stop();
}
running.set(false);
}
-
+
public synchronized void setAttemptsConfig(final AttemptsConfig config) {
- if(config == null) {
+ if (config == null) {
throw new IllegalArgumentException("Attempts configuration may not be null.");
}
this.attemptsConfig.numAttempts = config.numAttempts;
@@ -183,21 +183,21 @@ public class ClusterServiceLocator implements ServiceDiscovery {
config.timeBetweenAttempsUnit = this.attemptsConfig.timeBetweenAttempsUnit;
return config;
}
-
+
public static class AttemptsConfig {
-
+
private int numAttempts = 1;
-
+
private int timeBetweenAttempts = 1;
-
+
private TimeUnit timeBetweenAttempsUnit = TimeUnit.SECONDS;
-
+
public int getNumAttempts() {
return numAttempts;
}
public void setNumAttempts(int numAttempts) {
- if(numAttempts <= 0) {
+ if (numAttempts <= 0) {
throw new IllegalArgumentException("Number of attempts must be positive: " + numAttempts);
}
this.numAttempts = numAttempts;
@@ -208,9 +208,9 @@ public class ClusterServiceLocator implements ServiceDiscovery {
}
public void setTimeBetweenAttempsUnit(TimeUnit timeBetweenAttempsUnit) {
- if(timeBetweenAttempts <= 0) {
+ if (timeBetweenAttempts <= 0) {
throw new IllegalArgumentException("Time between attempts must be positive: " + numAttempts);
- }
+ }
this.timeBetweenAttempsUnit = timeBetweenAttempsUnit;
}
@@ -219,9 +219,9 @@ public class ClusterServiceLocator implements ServiceDiscovery {
}
public void setTimeBetweenAttempts(int timeBetweenAttempts) {
- if(timeBetweenAttempts <= 0) {
- throw new IllegalArgumentException("Time between attempts must be positive: " + numAttempts);
- }
+ if (timeBetweenAttempts <= 0) {
+ throw new IllegalArgumentException("Time between attempts must be positive: " + numAttempts);
+ }
this.timeBetweenAttempts = timeBetweenAttempts;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java
index e9e7d5b..3458760 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java
@@ -21,7 +21,10 @@ import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
-import java.util.*;
+import java.util.Collections;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
@@ -39,75 +42,76 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Broadcasts services used by the clustering software using multicast communication.
- * A configurable delay occurs after broadcasting the collection of services.
- *
+ * Broadcasts services used by the clustering software using multicast
+ * communication. A configurable delay occurs after broadcasting the collection
+ * of services.
+ *
* The client caller is responsible for starting and stopping the broadcasting.
* The instance must be stopped before termination of the JVM to ensure proper
* resource clean-up.
- *
+ *
* @author unattributed
*/
public class ClusterServicesBroadcaster implements MulticastServicesBroadcaster {
-
+
private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(ClusterServicesBroadcaster.class));
-
+
private final Set<DiscoverableService> services = new CopyOnWriteArraySet<>();
private final InetSocketAddress multicastAddress;
-
+
private final MulticastConfiguration multicastConfiguration;
-
+
private final ProtocolContext<ProtocolMessage> protocolContext;
-
+
private final int broadcastDelayMs;
-
+
private Timer broadcaster;
-
+
private MulticastSocket multicastSocket;
-
- public ClusterServicesBroadcaster(final InetSocketAddress multicastAddress,
- final MulticastConfiguration multicastConfiguration,
+
+ public ClusterServicesBroadcaster(final InetSocketAddress multicastAddress,
+ final MulticastConfiguration multicastConfiguration,
final ProtocolContext<ProtocolMessage> protocolContext, final String broadcastDelay) {
-
- if(multicastAddress == null) {
+
+ if (multicastAddress == null) {
throw new IllegalArgumentException("Multicast address may not be null.");
- } else if(multicastAddress.getAddress().isMulticastAddress() == false) {
+ } else if (multicastAddress.getAddress().isMulticastAddress() == false) {
throw new IllegalArgumentException("Multicast group address is not a Class D IP address.");
- } else if(protocolContext == null) {
+ } else if (protocolContext == null) {
throw new IllegalArgumentException("Protocol Context may not be null.");
- } else if(multicastConfiguration == null) {
+ } else if (multicastConfiguration == null) {
throw new IllegalArgumentException("Multicast configuration may not be null.");
}
-
+
this.services.addAll(services);
this.multicastAddress = multicastAddress;
this.multicastConfiguration = multicastConfiguration;
this.protocolContext = protocolContext;
this.broadcastDelayMs = (int) FormatUtils.getTimeDuration(broadcastDelay, TimeUnit.MILLISECONDS);
}
-
+
public void start() throws IOException {
- if(isRunning()) {
+ if (isRunning()) {
throw new IllegalStateException("Instance is already started.");
}
-
+
// setup socket
multicastSocket = MulticastUtils.createMulticastSocket(multicastConfiguration);
-
+
// setup broadcaster
broadcaster = new Timer("Cluster Services Broadcaster", /* is daemon */ true);
broadcaster.schedule(new TimerTask() {
@Override
public void run() {
- for(final DiscoverableService service : services) {
+ for (final DiscoverableService service : services) {
try {
final InetSocketAddress serviceAddress = service.getServiceAddress();
- logger.debug(String.format("Broadcasting Cluster Service '%s' at address %s:%d",
- service.getServiceName(), serviceAddress.getHostName(), serviceAddress.getPort()));
-
+ logger.debug(String.format("Broadcasting Cluster Service '%s' at address %s:%d",
+ service.getServiceName(), serviceAddress.getHostName(), serviceAddress.getPort()));
+
// create message
final ServiceBroadcastMessage msg = new ServiceBroadcastMessage();
msg.setServiceName(service.getServiceName());
@@ -124,37 +128,37 @@ public class ClusterServicesBroadcaster implements MulticastServicesBroadcaster
final DatagramPacket packet = new DatagramPacket(packetBytes, packetBytes.length, multicastAddress);
multicastSocket.send(packet);
- } catch(final Exception ex) {
+ } catch (final Exception ex) {
logger.warn(String.format("Cluster Services Broadcaster failed broadcasting service '%s' due to: %s", service.getServiceName(), ex), ex);
}
}
}
}, 0, broadcastDelayMs);
}
-
+
public boolean isRunning() {
return (broadcaster != null);
}
-
+
public void stop() {
-
- if(isRunning() == false) {
+
+ if (isRunning() == false) {
throw new IllegalStateException("Instance is already stopped.");
}
-
+
broadcaster.cancel();
broadcaster = null;
// close socket
MulticastUtils.closeQuietly(multicastSocket);
-
+
}
@Override
public int getBroadcastDelayMs() {
return broadcastDelayMs;
}
-
+
@Override
public Set<DiscoverableService> getServices() {
return Collections.unmodifiableSet(services);
@@ -164,16 +168,16 @@ public class ClusterServicesBroadcaster implements MulticastServicesBroadcaster
public InetSocketAddress getMulticastAddress() {
return multicastAddress;
}
-
+
@Override
public boolean addService(final DiscoverableService service) {
return services.add(service);
}
-
+
@Override
public boolean removeService(final String serviceName) {
- for(final DiscoverableService service : services) {
- if(service.getServiceName().equals(serviceName)) {
+ for (final DiscoverableService service : services) {
+ if (service.getServiceName().equals(serviceName)) {
return services.remove(service);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java
index 680df65..7ac17ab 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
import java.io.OutputStream;
public class CopyingInputStream extends FilterInputStream {
+
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
private final int maxBytesToCopy;
private final InputStream in;
@@ -32,45 +33,45 @@ public class CopyingInputStream extends FilterInputStream {
this.maxBytesToCopy = maxBytesToCopy;
this.in = in;
}
-
+
@Override
public int read() throws IOException {
final int delegateRead = in.read();
- if ( delegateRead != -1 && getNumberOfBytesCopied() < maxBytesToCopy ) {
+ if (delegateRead != -1 && getNumberOfBytesCopied() < maxBytesToCopy) {
baos.write(delegateRead);
}
-
+
return delegateRead;
}
-
+
@Override
public int read(byte[] b) throws IOException {
final int delegateRead = in.read(b);
- if ( delegateRead >= 0 ) {
+ if (delegateRead >= 0) {
baos.write(b, 0, Math.min(delegateRead, maxBytesToCopy - getNumberOfBytesCopied()));
}
-
+
return delegateRead;
}
-
+
@Override
public int read(byte[] b, int off, int len) throws IOException {
final int delegateRead = in.read(b, off, len);
- if ( delegateRead >= 0 ) {
+ if (delegateRead >= 0) {
baos.write(b, off, Math.min(delegateRead, maxBytesToCopy - getNumberOfBytesCopied()));
}
-
+
return delegateRead;
}
-
+
public byte[] getBytesRead() {
return baos.toByteArray();
}
-
+
public void writeBytes(final OutputStream out) throws IOException {
baos.writeTo(out);
}
-
+
public int getNumberOfBytesCopied() {
return baos.size();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java
index d3764b3..90f9124 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java
@@ -45,20 +45,20 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Implements a listener for protocol messages sent over multicast. If a message
+ * Implements a listener for protocol messages sent over multicast. If a message
* is of type MulticastProtocolMessage, then the underlying protocol message is
- * passed to the handler. If the receiving handler produces a message response,
- * then the message is wrapped with a MulticastProtocolMessage before being
- * sent to the originator.
- *
- * The client caller is responsible for starting and stopping the listener.
- * The instance must be stopped before termination of the JVM to ensure proper
+ * passed to the handler. If the receiving handler produces a message response,
+ * then the message is wrapped with a MulticastProtocolMessage before being sent
+ * to the originator.
+ *
+ * The client caller is responsible for starting and stopping the listener. The
+ * instance must be stopped before termination of the JVM to ensure proper
* resource clean-up.
- *
+ *
* @author unattributed
*/
public class MulticastProtocolListener extends MulticastListener implements ProtocolListener {
-
+
private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(MulticastProtocolListener.class));
// immutable members
@@ -74,7 +74,7 @@ public class MulticastProtocolListener extends MulticastListener implements Prot
final ProtocolContext<ProtocolMessage> protocolContext) {
super(numThreads, multicastAddress, configuration);
-
+
if (protocolContext == null) {
throw new IllegalArgumentException("Protocol Context may not be null.");
}
@@ -89,21 +89,21 @@ public class MulticastProtocolListener extends MulticastListener implements Prot
@Override
public void start() throws IOException {
- if(super.isRunning()) {
+ if (super.isRunning()) {
throw new IllegalStateException("Instance is already started.");
}
-
+
super.start();
-
+
}
@Override
public void stop() throws IOException {
- if(super.isRunning() == false) {
+ if (super.isRunning() == false) {
throw new IllegalStateException("Instance is already stopped.");
}
-
+
// shutdown listener
super.stop();
@@ -116,17 +116,17 @@ public class MulticastProtocolListener extends MulticastListener implements Prot
@Override
public void addHandler(final ProtocolHandler handler) {
- if(handler == null) {
+ if (handler == null) {
throw new NullPointerException("Protocol handler may not be null.");
}
handlers.add(handler);
}
-
+
@Override
public boolean removeHandler(final ProtocolHandler handler) {
return handlers.remove(handler);
}
-
+
@Override
public void dispatchRequest(final MulticastSocket multicastSocket, final DatagramPacket packet) {
@@ -138,10 +138,10 @@ public class MulticastProtocolListener extends MulticastListener implements Prot
// unwrap multicast message, if necessary
final ProtocolMessage unwrappedRequest;
- if(request instanceof MulticastProtocolMessage) {
+ if (request instanceof MulticastProtocolMessage) {
final MulticastProtocolMessage multicastRequest = (MulticastProtocolMessage) request;
// don't process a message we sent
- if(listenerId.equals(multicastRequest.getId())) {
+ if (listenerId.equals(multicastRequest.getId())) {
return;
} else {
unwrappedRequest = multicastRequest.getProtocolMessage();
@@ -149,7 +149,7 @@ public class MulticastProtocolListener extends MulticastListener implements Prot
} else {
unwrappedRequest = request;
}
-
+
// dispatch message to handler
ProtocolHandler desiredHandler = null;
for (final ProtocolHandler handler : getHandlers()) {
@@ -164,28 +164,28 @@ public class MulticastProtocolListener extends MulticastListener implements Prot
throw new ProtocolException("No handler assigned to handle message type: " + request.getType());
} else {
final ProtocolMessage response = desiredHandler.handle(request);
- if(response != null) {
+ if (response != null) {
try {
-
+
// wrap with listener id
final MulticastProtocolMessage multicastResponse = new MulticastProtocolMessage(listenerId, response);
-
+
// marshal message
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
marshaller.marshal(multicastResponse, baos);
final byte[] responseBytes = baos.toByteArray();
-
+
final int maxPacketSizeBytes = getMaxPacketSizeBytes();
- if(responseBytes.length > maxPacketSizeBytes) {
- logger.warn("Cluster protocol handler '" + desiredHandler.getClass() +
- "' produced a multicast response with length greater than configured max packet size '" + maxPacketSizeBytes + "'");
+ if (responseBytes.length > maxPacketSizeBytes) {
+ logger.warn("Cluster protocol handler '" + desiredHandler.getClass()
+ + "' produced a multicast response with length greater than configured max packet size '" + maxPacketSizeBytes + "'");
}
-
+
// create and send packet
- final DatagramPacket responseDatagram = new DatagramPacket(responseBytes, responseBytes.length, getMulticastAddress().getAddress(), getMulticastAddress().getPort());
+ final DatagramPacket responseDatagram = new DatagramPacket(responseBytes, responseBytes.length, getMulticastAddress().getAddress(), getMulticastAddress().getPort());
multicastSocket.send(responseDatagram);
-
+
} catch (final IOException ioe) {
throw new ProtocolException("Failed marshalling protocol message in response to message type: " + request.getType() + " due to: " + ioe, ioe);
}
@@ -194,8 +194,8 @@ public class MulticastProtocolListener extends MulticastListener implements Prot
} catch (final Throwable t) {
logger.warn("Failed processing protocol message due to " + t, t);
-
- if ( bulletinRepository != null ) {
+
+ if (bulletinRepository != null) {
final Bulletin bulletin = BulletinFactory.createBulletin("Clustering", "WARNING", "Failed to process Protocol Message due to " + t.toString());
bulletinRepository.addBulletin(bulletin);
}
|