nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject [4/7] incubator-nifi git commit: NIFI-567: Use batch duration of 500 ms instead of 5 seconds when sending via site-to-site
Date Thu, 30 Apr 2015 23:41:22 GMT
NIFI-567: Use batch duration of 500 ms instead of 5 seconds when sending via site-to-site


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/e7954cf0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/e7954cf0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/e7954cf0

Branch: refs/heads/develop
Commit: e7954cf04a8523e393327f5391dbc5209d887e98
Parents: e30cd23
Author: Mark Payne <markap14@hotmail.com>
Authored: Thu Apr 30 13:12:40 2015 -0400
Committer: Mark Payne <markap14@hotmail.com>
Committed: Thu Apr 30 13:12:40 2015 -0400

----------------------------------------------------------------------
 .../nifi/remote/StandardRemoteGroupPort.java    | 40 ++++++++++----------
 1 file changed, 20 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e7954cf0/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 982d9ff..773f9cf 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory;
 
 public class StandardRemoteGroupPort extends RemoteGroupPort {
 
-    private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches
of up to 5 seconds
+    private static final long BATCH_SEND_NANOS = TimeUnit.MILLISECONDS.toNanos(500L); //
send batches of up to 500 millis
     public static final String USER_AGENT = "NiFi-Site-to-Site";
     public static final String CONTENT_TYPE = "application/octet-stream";
 
@@ -98,7 +98,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
         return targetRunning.get();
     }
 
-    public void setTargetRunning(boolean targetRunning) {
+    public void setTargetRunning(final boolean targetRunning) {
         this.targetRunning.set(targetRunning);
     }
 
@@ -126,12 +126,12 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
         super.onSchedulingStart();
 
         final SiteToSiteClient client = new SiteToSiteClient.Builder()
-                .url(remoteGroup.getTargetUri().toString())
-                .portIdentifier(getIdentifier())
-                .sslContext(sslContext)
-                .eventReporter(remoteGroup.getEventReporter())
-                .peerPersistenceFile(getPeerPersistenceFile(getIdentifier()))
-                .build();
+        .url(remoteGroup.getTargetUri().toString())
+        .portIdentifier(getIdentifier())
+        .sslContext(sslContext)
+        .eventReporter(remoteGroup.getEventReporter())
+        .peerPersistenceFile(getPeerPersistenceFile(getIdentifier()))
+        .build();
         clientRef.set(client);
     }
 
@@ -147,7 +147,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
             return;
         }
 
-        String url = getRemoteProcessGroup().getTargetUri().toString();
+        final String url = getRemoteProcessGroup().getTargetUri().toString();
 
         // If we are sending data, we need to ensure that we have at least 1 FlowFile to
send. Otherwise,
         // we don't want to create a transaction at all.
@@ -230,7 +230,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
         return remoteGroup.getYieldDuration();
     }
 
-    private int transferFlowFiles(final Transaction transaction, final ProcessContext context,
final ProcessSession session, FlowFile firstFlowFile) throws IOException, ProtocolException
{
+    private int transferFlowFiles(final Transaction transaction, final ProcessContext context,
final ProcessSession session, final FlowFile firstFlowFile) throws IOException, ProtocolException
{
         FlowFile flowFile = firstFlowFile;
 
         try {
@@ -288,7 +288,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
 
             final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString()
: flowFilesSent.size() + " FlowFiles";
             logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate
of {}", new Object[]{
-                this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(),
uploadMillis, uploadDataRate});
+                    this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(),
uploadMillis, uploadDataRate});
 
             return flowFilesSent.size();
         } catch (final Exception e) {
@@ -345,7 +345,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
             final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
             final String dataSize = FormatUtils.formatDataSize(bytesReceived);
             logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at
a rate of {}", new Object[]{
-                this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(),
uploadMillis, uploadDataRate});
+                    this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(),
uploadMillis, uploadDataRate});
         }
 
         return flowFilesReceived.size();
@@ -367,16 +367,16 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
         ValidationResult error = null;
         if (!targetExists.get()) {
             error = new ValidationResult.Builder()
-                    .explanation(String.format("Remote instance indicates that port '%s'
no longer exists.", getName()))
-                    .subject(String.format("Remote port '%s'", getName()))
-                    .valid(false)
-                    .build();
+            .explanation(String.format("Remote instance indicates that port '%s' no longer
exists.", getName()))
+            .subject(String.format("Remote port '%s'", getName()))
+            .valid(false)
+            .build();
         } else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT &&
getConnections(Relationship.ANONYMOUS).isEmpty()) {
             error = new ValidationResult.Builder()
-                    .explanation(String.format("Port '%s' has no outbound connections", getName()))
-                    .subject(String.format("Remote port '%s'", getName()))
-                    .valid(false)
-                    .build();
+            .explanation(String.format("Port '%s' has no outbound connections", getName()))
+            .subject(String.format("Remote port '%s'", getName()))
+            .valid(false)
+            .build();
         }
 
         if (error != null) {


Mime
View raw message