nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject [06/10] nifi git commit: NIFI-1857: HTTPS Site-to-Site
Date Thu, 09 Jun 2016 19:45:28 GMT
http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
deleted file mode 100644
index 8336745..0000000
--- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.client.socket;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.nifi.remote.PeerStatus;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.cluster.ClusterNodeInformation;
-import org.apache.nifi.remote.cluster.NodeInformation;
-import org.junit.Test;
-
-public class TestEndpointConnectionStatePool {
-
-    @Test
-    public void testFormulateDestinationListForOutput() throws IOException {
-        final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
-        final List<NodeInformation> collection = new ArrayList<>();
-        collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096));
-        collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 10240));
-        collection.add(new NodeInformation("ShouldGetLittle", 3, 3333, true, 1024));
-        collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096));
-        collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096));
-
-        clusterNodeInfo.setNodeInformation(collection);
-        final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.RECEIVE);
-        for (final PeerStatus peerStatus : destinations) {
-            System.out.println(peerStatus.getPeerDescription());
-        }
-    }
-
-    @Test
-    public void testFormulateDestinationListForOutputHugeDifference() throws IOException {
-        final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
-        final List<NodeInformation> collection = new ArrayList<>();
-        collection.add(new NodeInformation("ShouldGetLittle", 1, 1111, true, 500));
-        collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 50000));
-
-        clusterNodeInfo.setNodeInformation(collection);
-        final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.RECEIVE);
-        for (final PeerStatus peerStatus : destinations) {
-            System.out.println(peerStatus.getPeerDescription());
-        }
-    }
-
-    @Test
-    public void testFormulateDestinationListForInputPorts() throws IOException {
-        final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
-        final List<NodeInformation> collection = new ArrayList<>();
-        collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096));
-        collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 10240));
-        collection.add(new NodeInformation("ShouldGetLots", 3, 3333, true, 1024));
-        collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096));
-        collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096));
-
-        clusterNodeInfo.setNodeInformation(collection);
-        final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
-        for (final PeerStatus peerStatus : destinations) {
-            System.out.println(peerStatus.getPeerDescription());
-        }
-    }
-
-    @Test
-    public void testFormulateDestinationListForInputPortsHugeDifference() throws IOException {
-        final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
-        final List<NodeInformation> collection = new ArrayList<>();
-        collection.add(new NodeInformation("ShouldGetLots", 1, 1111, true, 500));
-        collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 50000));
-
-        clusterNodeInfo.setNodeInformation(collection);
-        final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
-        for (final PeerStatus peerStatus : destinations) {
-            System.out.println(peerStatus.getPeerDescription());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/SiteToSiteTestUtils.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/SiteToSiteTestUtils.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/SiteToSiteTestUtils.java
new file mode 100644
index 0000000..90e8f55
--- /dev/null
+++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/SiteToSiteTestUtils.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransactionCompletion;
+import org.apache.nifi.remote.util.StandardDataPacket;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SiteToSiteTestUtils {
+    public static DataPacket createDataPacket(String contents) {
+        try {
+            byte[] bytes = contents.getBytes("UTF-8");
+            ByteArrayInputStream is = new ByteArrayInputStream(bytes);
+            return new StandardDataPacket(new HashMap<>(), is, bytes.length);
+        } catch (UnsupportedEncodingException e){
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static String readContents(DataPacket packet) throws IOException {
+        ByteArrayOutputStream os = new ByteArrayOutputStream((int) packet.getSize());
+        StreamUtils.copy(packet.getData(), os);
+        return new String(os.toByteArray(), "UTF-8");
+    }
+
+    public static void execReceiveZeroFlowFile(Transaction transaction) throws IOException {
+        assertEquals(Transaction.TransactionState.TRANSACTION_STARTED, transaction.getState());
+
+        DataPacket packet = transaction.receive();
+        assertNull(packet);
+
+        transaction.confirm();
+        assertEquals(Transaction.TransactionState.TRANSACTION_CONFIRMED, transaction.getState());
+
+        TransactionCompletion completion = transaction.complete();
+        assertEquals(Transaction.TransactionState.TRANSACTION_COMPLETED, transaction.getState());
+        assertFalse("Should NOT be backoff", completion.isBackoff());
+        assertEquals(0, completion.getDataPacketsTransferred());
+    }
+
+    public static void execReceiveOneFlowFile(Transaction transaction) throws IOException {
+        assertEquals(Transaction.TransactionState.TRANSACTION_STARTED, transaction.getState());
+
+        DataPacket packet = transaction.receive();
+        assertNotNull(packet);
+        assertEquals("contents on server 1", readContents(packet));
+        assertEquals(Transaction.TransactionState.DATA_EXCHANGED, transaction.getState());
+
+        packet = transaction.receive();
+        assertNull(packet);
+
+        transaction.confirm();
+        assertEquals(Transaction.TransactionState.TRANSACTION_CONFIRMED, transaction.getState());
+
+        TransactionCompletion completion = transaction.complete();
+        assertEquals(Transaction.TransactionState.TRANSACTION_COMPLETED, transaction.getState());
+        assertFalse("Should NOT be backoff", completion.isBackoff());
+        assertEquals(1, completion.getDataPacketsTransferred());
+    }
+
+    public static void execReceiveTwoFlowFiles(Transaction transaction) throws IOException {
+        DataPacket packet = transaction.receive();
+        assertNotNull(packet);
+        assertEquals("contents on server 1", readContents(packet));
+        assertEquals(Transaction.TransactionState.DATA_EXCHANGED, transaction.getState());
+
+        packet = transaction.receive();
+        assertNotNull(packet);
+        assertEquals("contents on server 2", readContents(packet));
+        assertEquals(Transaction.TransactionState.DATA_EXCHANGED, transaction.getState());
+
+        packet = transaction.receive();
+        assertNull(packet);
+
+        transaction.confirm();
+        assertEquals(Transaction.TransactionState.TRANSACTION_CONFIRMED, transaction.getState());
+
+        TransactionCompletion completion = transaction.complete();
+        assertEquals(Transaction.TransactionState.TRANSACTION_COMPLETED, transaction.getState());
+        assertFalse("Should NOT be backoff", completion.isBackoff());
+        assertEquals(2, completion.getDataPacketsTransferred());
+    }
+
+    public static void execReceiveWithInvalidChecksum(Transaction transaction) throws IOException {
+        assertEquals(Transaction.TransactionState.TRANSACTION_STARTED, transaction.getState());
+
+        DataPacket packet = transaction.receive();
+        assertNotNull(packet);
+        assertEquals("contents on server 1", readContents(packet));
+        assertEquals(Transaction.TransactionState.DATA_EXCHANGED, transaction.getState());
+
+        packet = transaction.receive();
+        assertNotNull(packet);
+        assertEquals("contents on server 2", readContents(packet));
+        assertEquals(Transaction.TransactionState.DATA_EXCHANGED, transaction.getState());
+
+        packet = transaction.receive();
+        assertNull(packet);
+
+        try {
+            transaction.confirm();
+            fail();
+        } catch (IOException e){
+            assertTrue(e.getMessage().contains("Received a BadChecksum response"));
+            assertEquals(Transaction.TransactionState.ERROR, transaction.getState());
+        }
+
+        try {
+            transaction.complete();
+            fail("It's not confirmed.");
+        } catch (IllegalStateException e){
+            assertEquals(Transaction.TransactionState.ERROR, transaction.getState());
+        }
+    }
+
+    public static void execSendZeroFlowFile(Transaction transaction) throws IOException {
+        assertEquals(Transaction.TransactionState.TRANSACTION_STARTED, transaction.getState());
+
+        try {
+            transaction.confirm();
+            fail("Nothing has been sent.");
+        } catch (IllegalStateException e){
+        }
+
+        try {
+            transaction.complete();
+            fail("Nothing has been sent.");
+        } catch (IllegalStateException e){
+        }
+    }
+
+    public static void execSendOneFlowFile(Transaction transaction) throws IOException {
+        assertEquals(Transaction.TransactionState.TRANSACTION_STARTED, transaction.getState());
+
+        DataPacket packet = createDataPacket("contents on client 1");
+        transaction.send(packet);
+
+        transaction.confirm();
+        assertEquals(Transaction.TransactionState.TRANSACTION_CONFIRMED, transaction.getState());
+
+        TransactionCompletion completion = transaction.complete();
+        assertEquals(Transaction.TransactionState.TRANSACTION_COMPLETED, transaction.getState());
+        assertFalse("Should NOT be backoff", completion.isBackoff());
+        assertEquals(1, completion.getDataPacketsTransferred());
+    }
+
+    public static void execSendTwoFlowFiles(Transaction transaction) throws IOException {
+        assertEquals(Transaction.TransactionState.TRANSACTION_STARTED, transaction.getState());
+
+        DataPacket packet = createDataPacket("contents on client 1");
+        transaction.send(packet);
+
+        packet = createDataPacket("contents on client 2");
+        transaction.send(packet);
+
+        transaction.confirm();
+        assertEquals(Transaction.TransactionState.TRANSACTION_CONFIRMED, transaction.getState());
+
+        TransactionCompletion completion = transaction.complete();
+        assertEquals(Transaction.TransactionState.TRANSACTION_COMPLETED, transaction.getState());
+        assertFalse("Should NOT be backoff", completion.isBackoff());
+        assertEquals(2, completion.getDataPacketsTransferred());
+    }
+
+    public static void execSendWithInvalidChecksum(Transaction transaction) throws IOException {
+        assertEquals(Transaction.TransactionState.TRANSACTION_STARTED, transaction.getState());
+
+        DataPacket packet = createDataPacket("contents on client 1");
+        transaction.send(packet);
+
+        packet = createDataPacket("contents on client 2");
+        transaction.send(packet);
+
+
+        try {
+            transaction.confirm();
+            fail();
+        } catch (IOException e){
+            assertTrue(e.getMessage().contains("peer calculated CRC32 Checksum as Different checksum"));
+            assertEquals(Transaction.TransactionState.ERROR, transaction.getState());
+        }
+
+        try {
+            transaction.complete();
+            fail("It's not confirmed.");
+        } catch (IllegalStateException e){
+            assertEquals(Transaction.TransactionState.ERROR, transaction.getState());
+        }
+    }
+
+    public static void execSendButDestinationFull(Transaction transaction) throws IOException {
+        assertEquals(Transaction.TransactionState.TRANSACTION_STARTED, transaction.getState());
+
+        DataPacket packet = createDataPacket("contents on client 1");
+        transaction.send(packet);
+
+        packet = createDataPacket("contents on client 2");
+        transaction.send(packet);
+
+        transaction.confirm();
+        assertEquals(Transaction.TransactionState.TRANSACTION_CONFIRMED, transaction.getState());
+
+        TransactionCompletion completion = transaction.complete();
+        assertEquals(Transaction.TransactionState.TRANSACTION_COMPLETED, transaction.getState());
+        assertTrue("Should be backoff", completion.isBackoff());
+        assertEquals(2, completion.getDataPacketsTransferred());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpClientTransaction.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpClientTransaction.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpClientTransaction.java
new file mode 100644
index 0000000..bed5a46
--- /dev/null
+++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpClientTransaction.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.http;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.codec.StandardFlowFileCodec;
+import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
+import org.apache.nifi.remote.io.http.HttpInput;
+import org.apache.nifi.remote.io.http.HttpOutput;
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.protocol.ResponseCode;
+import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.web.api.entity.TransactionResultEntity;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import static org.apache.nifi.remote.protocol.ResponseCode.CONFIRM_TRANSACTION;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.createDataPacket;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveOneFlowFile;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveTwoFlowFiles;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveWithInvalidChecksum;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveZeroFlowFile;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendButDestinationFull;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendOneFlowFile;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendTwoFlowFiles;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendWithInvalidChecksum;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendZeroFlowFile;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.readContents;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class TestHttpClientTransaction {
+
+    private Logger logger = LoggerFactory.getLogger(TestHttpClientTransaction.class);
+    private FlowFileCodec codec = new StandardFlowFileCodec();
+
+    private HttpClientTransaction getClientTransaction(InputStream is, OutputStream os, SiteToSiteRestApiClient apiClient, TransferDirection direction, String transactionUrl) throws IOException {
+        PeerDescription description = null;
+        String peerUrl = "";
+
+        HttpCommunicationsSession commsSession = new HttpCommunicationsSession();
+        ((HttpInput)commsSession.getInput()).setInputStream(is);
+        ((HttpOutput)commsSession.getOutput()).setOutputStream(os);
+
+        String clusterUrl = "";
+        Peer peer = new Peer(description, commsSession, peerUrl, clusterUrl);
+        String portId = "portId";
+        boolean useCompression = false;
+        int penaltyMillis = 1000;
+        EventReporter eventReporter = new EventReporter() {
+            @Override
+            public void reportEvent(Severity severity, String category, String message) {
+                logger.info("Reporting event... severity={}, category={}, message={}", severity, category, message);
+            }
+        };
+        int protocolVersion = 5;
+
+        HttpClientTransaction transaction = new HttpClientTransaction(protocolVersion, peer, direction, useCompression, portId, penaltyMillis, eventReporter);
+        transaction.initialize(apiClient, transactionUrl);
+
+        return transaction;
+    }
+
+    @Test
+    public void testReceiveZeroFlowFile() throws IOException {
+
+        SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
+        final String transactionUrl = "http://www.example.com/site-to-site/input-ports/portId/transactions/transactionId";
+        doReturn(false).when(apiClient).openConnectionForReceive(eq(transactionUrl), any(CommunicationsSession.class));
+
+        ByteArrayInputStream serverResponse = new ByteArrayInputStream(new byte[0]);
+        ByteArrayOutputStream clientRequest = new ByteArrayOutputStream();
+        HttpClientTransaction transaction = getClientTransaction(serverResponse, clientRequest, apiClient, TransferDirection.RECEIVE, transactionUrl);
+
+        execReceiveZeroFlowFile(transaction);
+
+        assertEquals("Client sends nothing as payload to receive flow files.", 0, clientRequest.toByteArray().length);
+    }
+
+    @Test
+    public void testReceiveOneFlowFile() throws IOException {
+
+        SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
+        final String transactionUrl = "http://www.example.com/site-to-site/input-ports/portId/transactions/transactionId";
+        doReturn(true).when(apiClient).openConnectionForReceive(eq(transactionUrl), any(CommunicationsSession.class));
+        TransactionResultEntity resultEntity = new TransactionResultEntity();
+        resultEntity.setResponseCode(CONFIRM_TRANSACTION.getCode());
+        doReturn(resultEntity).when(apiClient).commitReceivingFlowFiles(eq(transactionUrl), eq(CONFIRM_TRANSACTION), eq("3680976076"));
+
+        ByteArrayOutputStream serverResponseBos = new ByteArrayOutputStream();
+        codec.encode(createDataPacket("contents on server 1"), serverResponseBos);
+        ByteArrayInputStream serverResponse = new ByteArrayInputStream(serverResponseBos.toByteArray());
+        ByteArrayOutputStream clientRequest = new ByteArrayOutputStream();
+        HttpClientTransaction transaction = getClientTransaction(serverResponse, clientRequest, apiClient, TransferDirection.RECEIVE, transactionUrl);
+
+        execReceiveOneFlowFile(transaction);
+
+        assertEquals("Client sends nothing as payload to receive flow files.", 0, clientRequest.toByteArray().length);
+        verify(apiClient).commitReceivingFlowFiles(transactionUrl, CONFIRM_TRANSACTION, "3680976076");
+    }
+
+    @Test
+    public void testReceiveTwoFlowFiles() throws IOException {
+
+        SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
+        final String transactionUrl = "http://www.example.com/site-to-site/input-ports/portId/transactions/transactionId";
+        doReturn(true).when(apiClient).openConnectionForReceive(eq(transactionUrl), any(CommunicationsSession.class));
+        TransactionResultEntity resultEntity = new TransactionResultEntity();
+        resultEntity.setResponseCode(CONFIRM_TRANSACTION.getCode());
+        doReturn(resultEntity).when(apiClient).commitReceivingFlowFiles(eq(transactionUrl), eq(CONFIRM_TRANSACTION), eq("2969091230"));
+
+        ByteArrayOutputStream serverResponseBos = new ByteArrayOutputStream();
+        codec.encode(createDataPacket("contents on server 1"), serverResponseBos);
+        codec.encode(createDataPacket("contents on server 2"), serverResponseBos);
+        ByteArrayInputStream serverResponse = new ByteArrayInputStream(serverResponseBos.toByteArray());
+        ByteArrayOutputStream clientRequest = new ByteArrayOutputStream();
+        HttpClientTransaction transaction = getClientTransaction(serverResponse, clientRequest, apiClient, TransferDirection.RECEIVE, transactionUrl);
+
+        execReceiveTwoFlowFiles(transaction);
+
+        assertEquals("Client sends nothing as payload to receive flow files.", 0, clientRequest.toByteArray().length);
+        verify(apiClient).commitReceivingFlowFiles(transactionUrl, CONFIRM_TRANSACTION, "2969091230");
+    }
+
+    @Test
+    public void testReceiveWithInvalidChecksum() throws IOException {
+
+        SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
+        final String transactionUrl = "http://www.example.com/site-to-site/input-ports/portId/transactions/transactionId";
+        doReturn(true).when(apiClient).openConnectionForReceive(eq(transactionUrl), any(CommunicationsSession.class));
+        // The checksum is correct, but here we simulate as if it's wrong, BAD_CHECKSUM.
+        TransactionResultEntity resultEntity = new TransactionResultEntity();
+        resultEntity.setResponseCode(ResponseCode.BAD_CHECKSUM.getCode());
+        doReturn(resultEntity).when(apiClient).commitReceivingFlowFiles(eq(transactionUrl), eq(CONFIRM_TRANSACTION), eq("2969091230"));
+
+        ByteArrayOutputStream serverResponseBos = new ByteArrayOutputStream();
+        codec.encode(createDataPacket("contents on server 1"), serverResponseBos);
+        codec.encode(createDataPacket("contents on server 2"), serverResponseBos);
+        ByteArrayInputStream serverResponse = new ByteArrayInputStream(serverResponseBos.toByteArray());
+        ByteArrayOutputStream clientRequest = new ByteArrayOutputStream();
+        HttpClientTransaction transaction = getClientTransaction(serverResponse, clientRequest, apiClient, TransferDirection.RECEIVE, transactionUrl);
+
+        execReceiveWithInvalidChecksum(transaction);
+
+        assertEquals("Client sends nothing as payload to receive flow files.", 0, clientRequest.toByteArray().length);
+        verify(apiClient).commitReceivingFlowFiles(transactionUrl, CONFIRM_TRANSACTION, "2969091230");
+    }
+
+    @Test
+    public void testSendZeroFlowFile() throws IOException {
+
+        SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
+        final String transactionUrl = "http://www.example.com/site-to-site/input-ports/portId/transactions/transactionId";
+        doNothing().when(apiClient).openConnectionForSend(eq(transactionUrl), any(CommunicationsSession.class));
+
+        ByteArrayOutputStream serverResponseBos = new ByteArrayOutputStream();
+        ByteArrayInputStream serverResponse = new ByteArrayInputStream(serverResponseBos.toByteArray());
+        ByteArrayOutputStream clientRequest = new ByteArrayOutputStream();
+        HttpClientTransaction transaction = getClientTransaction(serverResponse, clientRequest, apiClient, TransferDirection.SEND, transactionUrl);
+
+        execSendZeroFlowFile(transaction);
+
+        assertEquals("Client didn't send anything", 0, clientRequest.toByteArray().length);
+    }
+
+    @Test
+    public void testSendOneFlowFile() throws IOException {
+
+        SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
+        final String transactionUrl = "http://www.example.com/site-to-site/input-ports/portId/transactions/transactionId";
+        doNothing().when(apiClient).openConnectionForSend(eq(transactionUrl), any(CommunicationsSession.class));
+        // Emulate that server returns correct checksum.
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                HttpCommunicationsSession commSession = (HttpCommunicationsSession)invocation.getArguments()[0];
+                commSession.setChecksum("2946083981");
+                return null;
+            }
+        }).when(apiClient).finishTransferFlowFiles(any(CommunicationsSession.class));
+        TransactionResultEntity resultEntity = new TransactionResultEntity();
+        resultEntity.setResponseCode(ResponseCode.TRANSACTION_FINISHED.getCode());
+        doReturn(resultEntity).when(apiClient).commitTransferFlowFiles(eq(transactionUrl), eq(CONFIRM_TRANSACTION));
+
+        ByteArrayOutputStream serverResponseBos = new ByteArrayOutputStream();
+        ByteArrayInputStream serverResponse = new ByteArrayInputStream(serverResponseBos.toByteArray());
+        ByteArrayOutputStream clientRequest = new ByteArrayOutputStream();
+        HttpClientTransaction transaction = getClientTransaction(serverResponse, clientRequest, apiClient, TransferDirection.SEND, transactionUrl);
+
+        execSendOneFlowFile(transaction);
+
+        InputStream sentByClient = new ByteArrayInputStream(clientRequest.toByteArray());
+        DataPacket packetByClient = codec.decode(sentByClient);
+        assertEquals("contents on client 1", readContents(packetByClient));
+        assertEquals(-1, sentByClient.read());
+
+        verify(apiClient).commitTransferFlowFiles(transactionUrl, CONFIRM_TRANSACTION);
+    }
+
+    @Test
+    public void testSendTwoFlowFiles() throws IOException {
+
+        SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
+        final String transactionUrl = "http://www.example.com/site-to-site/input-ports/portId/transactions/transactionId";
+        doNothing().when(apiClient).openConnectionForSend(eq("portId"), any(CommunicationsSession.class));
+        // Emulate that server returns correct checksum.
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                HttpCommunicationsSession commSession = (HttpCommunicationsSession)invocation.getArguments()[0];
+                commSession.setChecksum("3359812065");
+                return null;
+            }
+        }).when(apiClient).finishTransferFlowFiles(any(CommunicationsSession.class));
+        TransactionResultEntity resultEntity = new TransactionResultEntity();
+        resultEntity.setResponseCode(ResponseCode.TRANSACTION_FINISHED.getCode());
+        doReturn(resultEntity).when(apiClient).commitTransferFlowFiles(eq(transactionUrl), eq(CONFIRM_TRANSACTION));
+
+        ByteArrayOutputStream serverResponseBos = new ByteArrayOutputStream();
+        ByteArrayInputStream serverResponse = new ByteArrayInputStream(serverResponseBos.toByteArray());
+        ByteArrayOutputStream clientRequest = new ByteArrayOutputStream();
+        HttpClientTransaction transaction = getClientTransaction(serverResponse, clientRequest, apiClient, TransferDirection.SEND, transactionUrl);
+
+        execSendTwoFlowFiles(transaction);
+
+        InputStream sentByClient = new ByteArrayInputStream(clientRequest.toByteArray());
+        DataPacket packetByClient = codec.decode(sentByClient);
+        assertEquals("contents on client 1", readContents(packetByClient));
+        packetByClient = codec.decode(sentByClient);
+        assertEquals("contents on client 2", readContents(packetByClient));
+        assertEquals(-1, sentByClient.read());
+
+        verify(apiClient).commitTransferFlowFiles(transactionUrl, CONFIRM_TRANSACTION);
+    }
+
+    @Test
+    public void testSendWithInvalidChecksum() throws IOException {
+        SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
+        final String transactionUrl = "http://www.example.com/site-to-site/input-ports/portId/transactions/transactionId";
+        doNothing().when(apiClient).openConnectionForSend(eq(transactionUrl), any(CommunicationsSession.class));
+        // Emulate that server returns incorrect checksum.
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                HttpCommunicationsSession commSession = (HttpCommunicationsSession)invocation.getArguments()[0];
+                commSession.setChecksum("Different checksum");
+                return null;
+            }
+        }).when(apiClient).finishTransferFlowFiles(any(CommunicationsSession.class));
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                TransactionResultEntity serverResult = new TransactionResultEntity();
+                serverResult.setResponseCode(ResponseCode.CANCEL_TRANSACTION.getCode());
+                return serverResult;
+            }
+        }).when(apiClient).commitTransferFlowFiles(eq(transactionUrl), eq(ResponseCode.BAD_CHECKSUM));
+
+        ByteArrayOutputStream serverResponseBos = new ByteArrayOutputStream();
+        ByteArrayInputStream serverResponse = new ByteArrayInputStream(serverResponseBos.toByteArray());
+        ByteArrayOutputStream clientRequest = new ByteArrayOutputStream();
+        HttpClientTransaction transaction = getClientTransaction(serverResponse, clientRequest, apiClient, TransferDirection.SEND, transactionUrl);
+
+        execSendWithInvalidChecksum(transaction);
+
+        InputStream sentByClient = new ByteArrayInputStream(clientRequest.toByteArray());
+        DataPacket packetByClient = codec.decode(sentByClient);
+        assertEquals("contents on client 1", readContents(packetByClient));
+        packetByClient = codec.decode(sentByClient);
+        assertEquals("contents on client 2", readContents(packetByClient));
+        assertEquals(-1, sentByClient.read());
+
+        verify(apiClient).commitTransferFlowFiles(transactionUrl, ResponseCode.BAD_CHECKSUM);
+    }
+
+    @Test
+    public void testSendButDestinationFull() throws IOException {
+
+        SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
+        final String transactionUrl = "http://www.example.com/site-to-site/input-ports/portId/transactions/transactionId";
+        doNothing().when(apiClient).openConnectionForSend(eq("portId"), any(CommunicationsSession.class));
+        // Emulate that server returns correct checksum.
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                HttpCommunicationsSession commSession = (HttpCommunicationsSession)invocation.getArguments()[0];
+                commSession.setChecksum("3359812065");
+                return null;
+            }
+        }).when(apiClient).finishTransferFlowFiles(any(CommunicationsSession.class));
+        TransactionResultEntity resultEntity = new TransactionResultEntity();
+        resultEntity.setResponseCode(ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.getCode());
+        doReturn(resultEntity).when(apiClient).commitTransferFlowFiles(eq(transactionUrl), eq(CONFIRM_TRANSACTION));
+
+        ByteArrayOutputStream serverResponseBos = new ByteArrayOutputStream();
+        ByteArrayInputStream serverResponse = new ByteArrayInputStream(serverResponseBos.toByteArray());
+        ByteArrayOutputStream clientRequest = new ByteArrayOutputStream();
+        HttpClientTransaction transaction = getClientTransaction(serverResponse, clientRequest, apiClient, TransferDirection.SEND, transactionUrl);
+
+        execSendButDestinationFull(transaction);
+
+        InputStream sentByClient = new ByteArrayInputStream(clientRequest.toByteArray());
+        DataPacket packetByClient = codec.decode(sentByClient);
+        assertEquals("contents on client 1", readContents(packetByClient));
+        packetByClient = codec.decode(sentByClient);
+        assertEquals("contents on client 2", readContents(packetByClient));
+        assertEquals(-1, sentByClient.read());
+
+        verify(apiClient).commitTransferFlowFiles(transactionUrl, CONFIRM_TRANSACTION);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java
new file mode 100644
index 0000000..9624f44
--- /dev/null
+++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.codec.StandardFlowFileCodec;
+import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
+import org.apache.nifi.remote.io.socket.SocketChannelInput;
+import org.apache.nifi.remote.io.socket.SocketChannelOutput;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.protocol.RequestType;
+import org.apache.nifi.remote.protocol.Response;
+import org.apache.nifi.remote.protocol.ResponseCode;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.createDataPacket;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveOneFlowFile;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveTwoFlowFiles;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveWithInvalidChecksum;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveZeroFlowFile;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendButDestinationFull;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendOneFlowFile;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendTwoFlowFiles;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendWithInvalidChecksum;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendZeroFlowFile;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.readContents;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestSocketClientTransaction {
+
+    private Logger logger = LoggerFactory.getLogger(TestSocketClientTransaction.class);
+    private FlowFileCodec codec = new StandardFlowFileCodec();
+
+    private SocketClientTransaction getClientTransaction(ByteArrayInputStream bis, ByteArrayOutputStream bos, TransferDirection direction) throws IOException {
+        PeerDescription description = null;
+        String peerUrl = "";
+        SocketChannelCommunicationsSession commsSession = mock(SocketChannelCommunicationsSession.class);
+        SocketChannelInput socketIn = mock(SocketChannelInput.class);
+        SocketChannelOutput socketOut = mock(SocketChannelOutput.class);
+        when(commsSession.getInput()).thenReturn(socketIn);
+        when(commsSession.getOutput()).thenReturn(socketOut);
+
+        when(socketIn.getInputStream()).thenReturn(bis);
+        when(socketOut.getOutputStream()).thenReturn(bos);
+
+        String clusterUrl = "";
+        Peer peer = new Peer(description, commsSession, peerUrl, clusterUrl);
+        boolean useCompression = false;
+        int penaltyMillis = 1000;
+        EventReporter eventReporter = null;
+        int protocolVersion = 5;
+        String destinationId = "destinationId";
+        return new SocketClientTransaction(protocolVersion, destinationId, peer, codec, direction, useCompression, penaltyMillis, eventReporter);
+    }
+
+    @Test
+    public void testReceiveZeroFlowFile() throws IOException {
+
+        ByteArrayOutputStream serverResponseBos = new ByteArrayOutputStream();
+        DataOutputStream serverResponse = new DataOutputStream(serverResponseBos);
+        ResponseCode.NO_MORE_DATA.writeResponse(serverResponse);
+
+        ByteArrayInputStream bis = new ByteArrayInputStream(serverResponseBos.toByteArray());
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+        SocketClientTransaction transaction = getClientTransaction(bis, bos, TransferDirection.RECEIVE);
+
+        execReceiveZeroFlowFile(transaction);
+
+        // Verify what client has sent.
+        DataInputStream sentByClient = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+        assertEquals(RequestType.RECEIVE_FLOWFILES, RequestType.readRequestType(sentByClient));
+        assertEquals(-1, sentByClient.read());
+    }
+
+    @Test
+    public void testReceiveOneFlowFile() throws IOException {
+
+        ByteArrayOutputStream serverResponseBos = new ByteArrayOutputStream();
+        DataOutputStream serverResponse = new DataOutputStream(serverResponseBos);
+        ResponseCode.MORE_DATA.writeResponse(serverResponse);
+        codec.encode(createDataPacket("contents on server 1"), serverResponse);
+        ResponseCode.FINISH_TRANSACTION.writeResponse(serverResponse);
+        ResponseCode.CONFIRM_TRANSACTION.writeResponse(serverResponse, "Checksum has been verified at server.");
+
+        ByteArrayInputStream bis = new ByteArrayInputStream(serverResponseBos.toByteArray());
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+        SocketClientTransaction transaction = getClientTransaction(bis, bos, TransferDirection.RECEIVE);
+
+        execReceiveOneFlowFile(transaction);
+
+        // Verify what client has sent.
+        DataInputStream sentByClient = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+        assertEquals(RequestType.RECEIVE_FLOWFILES, RequestType.readRequestType(sentByClient));
+        Response confirmResponse = Response.read(sentByClient);
+        assertEquals(ResponseCode.CONFIRM_TRANSACTION, confirmResponse.getCode());
+        assertEquals("Checksum should be calculated at client", "3680976076", confirmResponse.getMessage());
+        Response completeResponse = Response.read(sentByClient);
+        assertEquals(ResponseCode.TRANSACTION_FINISHED, completeResponse.getCode());
+        assertEquals(-1, sentByClient.read());
+    }
+
+    @Test
+    public void testReceiveTwoFlowFiles() throws IOException {
+
+        ByteArrayOutputStream serverResponseBos = new ByteArrayOutputStream();
+        DataOutputStream serverResponse = new DataOutputStream(serverResponseBos);
+        ResponseCode.MORE_DATA.writeResponse(serverResponse);
+        codec.encode(createDataPacket("contents on server 1"), serverResponse);
+        ResponseCode.CONTINUE_TRANSACTION.writeResponse(serverResponse);
+        codec.encode(createDataPacket("contents on server 2"), serverResponse);
+        ResponseCode.FINISH_TRANSACTION.writeResponse(serverResponse);
+        ResponseCode.CONFIRM_TRANSACTION.writeResponse(serverResponse, "Checksum has been verified at server.");
+
+
+        ByteArrayInputStream bis = new ByteArrayInputStream(serverResponseBos.toByteArray());
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+        SocketClientTransaction transaction = getClientTransaction(bis, bos, TransferDirection.RECEIVE);
+
+        assertEquals(Transaction.TransactionState.TRANSACTION_STARTED, transaction.getState());
+
+        execReceiveTwoFlowFiles(transaction);
+
+        // Verify what client has sent.
+        DataInputStream sentByClient = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+        assertEquals(RequestType.RECEIVE_FLOWFILES, RequestType.readRequestType(sentByClient));
+        Response confirmResponse = Response.read(sentByClient);
+        assertEquals(ResponseCode.CONFIRM_TRANSACTION, confirmResponse.getCode());
+        assertEquals("Checksum should be calculated at client", "2969091230", confirmResponse.getMessage());
+        Response completeResponse = Response.read(sentByClient);
+        assertEquals(ResponseCode.TRANSACTION_FINISHED, completeResponse.getCode());
+        assertEquals(-1, sentByClient.read());
+    }
+
+    @Test
+    public void testReceiveWithInvalidChecksum() throws IOException {
+
+        ByteArrayOutputStream serverResponseBos = new ByteArrayOutputStream();
+        DataOutputStream serverResponse = new DataOutputStream(serverResponseBos);
+        ResponseCode.MORE_DATA.writeResponse(serverResponse);
+        codec.encode(createDataPacket("contents on server 1"), serverResponse);
+        ResponseCode.CONTINUE_TRANSACTION.writeResponse(serverResponse);
+        codec.encode(createDataPacket("contents on server 2"), serverResponse);
+        ResponseCode.FINISH_TRANSACTION.writeResponse(serverResponse);
+        ResponseCode.BAD_CHECKSUM.writeResponse(serverResponse);
+
+
+        ByteArrayInputStream bis = new ByteArrayInputStream(serverResponseBos.toByteArray());
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+        SocketClientTransaction transaction = getClientTransaction(bis, bos, TransferDirection.RECEIVE);
+
+        execReceiveWithInvalidChecksum(transaction);
+
+        // Verify what client has sent.
+        DataInputStream sentByClient = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+        assertEquals(RequestType.RECEIVE_FLOWFILES, RequestType.readRequestType(sentByClient));
+        Response confirmResponse = Response.read(sentByClient);
+        assertEquals(ResponseCode.CONFIRM_TRANSACTION, confirmResponse.getCode());
+        assertEquals("Checksum should be calculated at client", "2969091230", confirmResponse.getMessage());
+        assertEquals(-1, sentByClient.read());
+    }
+
+
+    @Test
+    public void testSendZeroFlowFile() throws IOException {
+
+        ByteArrayOutputStream serverResponseBos = new ByteArrayOutputStream();
+
+        ByteArrayInputStream bis = new ByteArrayInputStream(serverResponseBos.toByteArray());
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+        SocketClientTransaction transaction = getClientTransaction(bis, bos, TransferDirection.SEND);
+
+        execSendZeroFlowFile(transaction);
+
+        // Verify what client has sent.
+        DataInputStream sentByClient = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+        assertEquals(RequestType.SEND_FLOWFILES, RequestType.readRequestType(sentByClient));
+        assertEquals(-1, sentByClient.read());
+    }
+
+    @Test
+    public void testSendOneFlowFile() throws IOException {
+
+        ByteArrayOutputStream serverResponseBos = new ByteArrayOutputStream();
+        DataOutputStream serverResponse = new DataOutputStream(serverResponseBos);
+        ResponseCode.CONFIRM_TRANSACTION.writeResponse(serverResponse, "2946083981");
+        ResponseCode.TRANSACTION_FINISHED.writeResponse(serverResponse);
+
+        ByteArrayInputStream bis = new ByteArrayInputStream(serverResponseBos.toByteArray());
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+        SocketClientTransaction transaction = getClientTransaction(bis, bos, TransferDirection.SEND);
+
+        execSendOneFlowFile(transaction);
+
+        // Verify what client has sent.
+        DataInputStream sentByClient = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+        assertEquals(RequestType.SEND_FLOWFILES, RequestType.readRequestType(sentByClient));
+        DataPacket packetByClient = codec.decode(sentByClient);
+        assertEquals("contents on client 1", readContents(packetByClient));
+        Response endOfDataResponse = Response.read(sentByClient);
+        assertEquals(ResponseCode.FINISH_TRANSACTION, endOfDataResponse.getCode());
+        Response confirmResponse = Response.read(sentByClient);
+        assertEquals(ResponseCode.CONFIRM_TRANSACTION, confirmResponse.getCode());
+        assertEquals(-1, sentByClient.read());
+    }
+
+    @Test
+    public void testSendTwoFlowFiles() throws IOException {
+
+        ByteArrayOutputStream serverResponseBos = new ByteArrayOutputStream();
+        DataOutputStream serverResponse = new DataOutputStream(serverResponseBos);
+        ResponseCode.CONFIRM_TRANSACTION.writeResponse(serverResponse, "3359812065");
+        ResponseCode.TRANSACTION_FINISHED.writeResponse(serverResponse);
+
+        ByteArrayInputStream bis = new ByteArrayInputStream(serverResponseBos.toByteArray());
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+        SocketClientTransaction transaction = getClientTransaction(bis, bos, TransferDirection.SEND);
+
+        execSendTwoFlowFiles(transaction);
+
+        // Verify what client has sent.
+        DataInputStream sentByClient = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+        assertEquals(RequestType.SEND_FLOWFILES, RequestType.readRequestType(sentByClient));
+        DataPacket packetByClient = codec.decode(sentByClient);
+        assertEquals("contents on client 1", readContents(packetByClient));
+        Response continueDataResponse = Response.read(sentByClient);
+        assertEquals(ResponseCode.CONTINUE_TRANSACTION, continueDataResponse.getCode());
+        packetByClient = codec.decode(sentByClient);
+        assertEquals("contents on client 2", readContents(packetByClient));
+        Response endOfDataResponse = Response.read(sentByClient);
+        assertEquals(ResponseCode.FINISH_TRANSACTION, endOfDataResponse.getCode());
+        Response confirmResponse = Response.read(sentByClient);
+        assertEquals(ResponseCode.CONFIRM_TRANSACTION, confirmResponse.getCode());
+        assertEquals(-1, sentByClient.read());
+    }
+
+    @Test
+    public void testSendWithInvalidChecksum() throws IOException {
+
+        ByteArrayOutputStream serverResponseBos = new ByteArrayOutputStream();
+        DataOutputStream serverResponse = new DataOutputStream(serverResponseBos);
+        ResponseCode.CONFIRM_TRANSACTION.writeResponse(serverResponse, "Different checksum");
+
+        ByteArrayInputStream bis = new ByteArrayInputStream(serverResponseBos.toByteArray());
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+        SocketClientTransaction transaction = getClientTransaction(bis, bos, TransferDirection.SEND);
+
+        execSendWithInvalidChecksum(transaction);
+
+        // Verify what client has sent.
+        DataInputStream sentByClient = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+        assertEquals(RequestType.SEND_FLOWFILES, RequestType.readRequestType(sentByClient));
+        DataPacket packetByClient = codec.decode(sentByClient);
+        assertEquals("contents on client 1", readContents(packetByClient));
+        Response continueDataResponse = Response.read(sentByClient);
+        assertEquals(ResponseCode.CONTINUE_TRANSACTION, continueDataResponse.getCode());
+        packetByClient = codec.decode(sentByClient);
+        assertEquals("contents on client 2", readContents(packetByClient));
+        Response endOfDataResponse = Response.read(sentByClient);
+        assertEquals(ResponseCode.FINISH_TRANSACTION, endOfDataResponse.getCode());
+        Response confirmResponse = Response.read(sentByClient);
+        assertEquals(ResponseCode.BAD_CHECKSUM, confirmResponse.getCode());
+        assertEquals(-1, sentByClient.read());
+    }
+
+
+    @Test
+    public void testSendButDestinationFull() throws IOException {
+
+        ByteArrayOutputStream serverResponseBos = new ByteArrayOutputStream();
+        DataOutputStream serverResponse = new DataOutputStream(serverResponseBos);
+        ResponseCode.CONFIRM_TRANSACTION.writeResponse(serverResponse, "3359812065");
+        ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(serverResponse);
+
+        ByteArrayInputStream bis = new ByteArrayInputStream(serverResponseBos.toByteArray());
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+        SocketClientTransaction transaction = getClientTransaction(bis, bos, TransferDirection.SEND);
+
+        execSendButDestinationFull(transaction);
+
+        // Verify what client has sent.
+        DataInputStream sentByClient = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+        assertEquals(RequestType.SEND_FLOWFILES, RequestType.readRequestType(sentByClient));
+        DataPacket packetByClient = codec.decode(sentByClient);
+        assertEquals("contents on client 1", readContents(packetByClient));
+        Response continueDataResponse = Response.read(sentByClient);
+        assertEquals(ResponseCode.CONTINUE_TRANSACTION, continueDataResponse.getCode());
+        packetByClient = codec.decode(sentByClient);
+        assertEquals("contents on client 2", readContents(packetByClient));
+        Response endOfDataResponse = Response.read(sentByClient);
+        assertEquals(ResponseCode.FINISH_TRANSACTION, endOfDataResponse.getCode());
+        Response confirmResponse = Response.read(sentByClient);
+        assertEquals(ResponseCode.CONFIRM_TRANSACTION, confirmResponse.getCode());
+        assertEquals(-1, sentByClient.read());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-docs/src/main/asciidoc/administration-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 3742cc2..9f7d56d 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -1347,12 +1347,18 @@ of 576.
 ==== Site to Site Properties
 
 These properties govern how this instance of NiFi communicates with remote instances of NiFi when Remote Process Groups are configured in the dataflow.
+Remote Process Groups can choose transport protocol from RAW and HTTP. Properties named with _nifi.remote.input.socket.*_ are RAW transport protocol specific. Similarly, _nifi.remote.input.http.*_ are HTTP transport protocol specific properties.
 
 |====
 |*Property*|*Description*
-|nifi.remote.input.socket.host|The host name that will be given out to clients to connect to this NiFi instance for Site-to-Site communication. By default, it is the value from InetAddress.getLocalHost().getHostName(). On UNIX-like operating systems, this is typically the output from the `hostname` command.
-|nifi.remote.input.socket.port|The remote input socket port for Site-to-Site communication. By default, it is blank, but it must have a value in order to use Remote Process Groups.
-|nifi.remote.input.secure|This indicates whether communication between this instance of NiFi and remote NiFi instances should be secure. By default, it is set to _true_. In order for secure site-to-site to work, many Security Properties (below) must also be configured.
+|[line-through]#nifi.remote.input.socket.host# +
+nifi.remote.input.host +
+(renamed since NiFi 1.0)|The host name that will be given out to clients to connect to this NiFi instance for Site-to-Site communication. By default, it is the value from InetAddress.getLocalHost().getHostName(). On UNIX-like operating systems, this is typically the output from the `hostname` command.
+|nifi.remote.input.secure|This indicates whether communication between this instance of NiFi and remote NiFi instances should be secure. By default, it is set to _false_. In order for secure site-to-site to work, in addition to change it to _true_, many Security Properties (below) must also be configured.
+|nifi.remote.input.socket.port|The remote input socket port for Site-to-Site communication. By default, it is blank, but it must have a value in order to use RAW socket as transport protocol for Site-to-Site.
+|nifi.remote.input.http.enabled|Specify if HTTP Site-to-Site should be enabled on this host. By default, it is _true_. HTTP non-secure Site-to-Site is enabled by default. +
+Whether a Site-to-Site client uses HTTP or HTTPS is determined by _nifi.remote.input.secure_. If it is set to _true_, then requests are sent as HTTPS to _nifi.web.https.port_, if it is _false_, HTTP requests are sent to _nifi.web.http.port_.
+|nifi.remote.input.http.transaction.ttl|Specify how long a transaction can stay alive on server. If a Site-to-Site client didn't proceed to next action for this period of time, the transaction is discarded from remote NiFi instance. For example, a client creates a transaction but doesn't send or receive flow files, or send or received flow files but doesn't confirm that transaction. By default, it is set to 30 seconds.|
 |====
 
 ==== Web Properties

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-docs/src/main/asciidoc/getting-started.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/getting-started.adoc b/nifi-docs/src/main/asciidoc/getting-started.adoc
index 486606f..28a89a3 100644
--- a/nifi-docs/src/main/asciidoc/getting-started.adoc
+++ b/nifi-docs/src/main/asciidoc/getting-started.adoc
@@ -418,6 +418,7 @@ categorizing them by their functions.
 - *PostHTTP*: Performs an HTTP POST request, sending the contents of the FlowFile as the body of the message. This is often used in conjunction
   with ListenHTTP in order to transfer data between two different instances of NiFi in cases where Site-to-Site cannot be used (for instance,
 	when the nodes cannot access each other directly and are able to communicate through an HTTP proxy).
+	*Note*: HTTP is available as a link:user-guide.html#site-to-site[Site-to-Site] transport protocol in addition to the existing RAW socket transport. It also supports HTTP Proxy. Using HTTP Site-to-Site is recommended since it's more scalable, and can provide bi-directional data transfer using input/output ports with better user authentication and authorization.
 - *HandleHttpRequest* / *HandleHttpResponse*: The HandleHttpRequest Processor is a Source Processor that starts an embedded HTTP(S) server
   similarly to ListenHTTP. However, it does not send a response to the client. Instead, the FlowFile is sent out with the body of the HTTP request
 	as its contents and attributes for all of the typical Servlet parameters, headers, etc. as Attributes. The HandleHttpResponse then is able to

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-docs/src/main/asciidoc/images/configure-remote-process-group.png
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/images/configure-remote-process-group.png b/nifi-docs/src/main/asciidoc/images/configure-remote-process-group.png
new file mode 100644
index 0000000..3f57b0e
Binary files /dev/null and b/nifi-docs/src/main/asciidoc/images/configure-remote-process-group.png differ

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-docs/src/main/asciidoc/user-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/user-guide.adoc b/nifi-docs/src/main/asciidoc/user-guide.adoc
index 9e6de2c..2a3f672 100644
--- a/nifi-docs/src/main/asciidoc/user-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/user-guide.adoc
@@ -781,10 +781,23 @@ link:administration-guide.html[Admin Guide].
    This allows new capabilities to be added while still maintaining backward compatibility with all older instances. Additionally, if a vulnerability
    or deficiency is ever discovered in a protocol, it allows a newer version of NiFi to forbid communication over the compromised versions of the protocol.
 
-In order to communicate with a remote NiFi instance via Site-to-Site, simply drag a <<remote_process_group,Remote Process Group>> onto the canvas
+Site-to-Site is a protocol transferring data between two NiFi instances. Both end can be a standalone NiFi or a NiFi cluster. In this section, the NiFi instance initiates the communications is called _Site-to-Site client NiFi instance_ and the other end as _Site-to-Site server NiFi instance_ to clarify what configuration needed on each NiFi instances.
+
+A NiFi instance can be both client and server for Site-to-Site protocol, however, it can only be a client or server within a specific Site-to-Site communication. For example, if there are three NiFi instances A, B and C. A pushes data to B, and B pulls data from C. _A -- push -> B <- pull -- C_. Then B is not only a _server_ in the communication between A and B, but also a _client_ in B and C.
+
+It is important to understand which NiFi instance will be the client or server in order to design your data flow, and configure each instance accordingly. Here is a summary of what components run on which side based on data flow direction:
+
+- Push: a client _sends_ data to a Remote Process Group, the server _receives_ it with an Input Port
+
+- Pull: a client _receives_ data from a Remote Process Group, the server _sends_ data through an Output Port
+
+==== Configure Site-to-Site client NiFi instance
+
+[[Site-to-Site_Remote_Process_Group]]
+*Remote Process Group*: In order to communicate with a remote NiFi instance via Site-to-Site, simply drag a <<remote_process_group,Remote Process Group>> onto the canvas
 and enter the URL of the remote NiFi instance (for more information on the components of a Remote Process Group, see 
 <<Remote_Group_Transmission,Remote Process Group Transmission>> section of this guide.) The URL is the same 
-URL you would use to go to that instance's User Interface. At that point, you can drag a connection to or from the Remote Process Group 
+URL you would use to go to that instance's User Interface. At that point, you can drag a connection to or from the Remote Process Group
 in the same way you would drag a connection to or from a Processor or a local Process Group. When you drag the connection, you will have
 a chance to choose which Port to connect to. Note that it may take up to one minute for the Remote Process Group to determine
 which ports are available.
@@ -797,7 +810,17 @@ the ports shown will be the Input Ports of the remote group, as this implies tha
 communicate with. For information on configuring NiFi to run securely, see the
 link:administration-guide.html[Admin Guide].
 
-In order to allow another NiFi instance to push data to your local instance, you can simply drag an <<input_port,Input Port>> onto the Root Process Group
+[[Site-to-Site_Transport_Protocol]]
+*Transport Protocol*: On a Remote Process Group creation or configuration dialog, you can choose Transport Protocol to use for Site-to-Site communication as shown in the following image:
+
+image:configure-remote-process-group.png["Configure Remote Process Group", width=395]
+
+By default, it is set to _RAW_ which uses raw socket communication using a dedicated port. _HTTP_ transport protocol is especially useful if the remote NiFi instance is in a restricted network that only allow access through HTTP(S) protocol or only accessible from a specific HTTP Proxy server. For accessing through a HTTP Proxy Server, BASIC and DIGEST authentication are supported.
+
+==== Configure Site-to-Site server NiFi instance
+
+[[Site-to-Site_Input_Port]]
+*Input Port*: In order to allow another NiFi instance to push data to your local instance, you can simply drag an <<input_port,Input Port>> onto the Root Process Group
 of your canvas. After entering a name for the port, it will be added to your flow. You can now right-click on the Input Port and choose Configure in order
 to adjust the name and the number of concurrent tasks that are used for the port. If Site-to-Site is configured to run securely, you will also be given
 the ability to adjust who has access to the port. If secure, only those who have been granted access to communicate with the port will be able to see
@@ -806,14 +829,16 @@ that the port exists.
 After being given access to a particular port, in order to see that port, the operator of a remote NiFi instance may need to right-click on their Remote
 Process Group and choose to "Refresh" the flow.
 
-Similar to an Input Port, a DataFlow Manager may choose to add an <<output_port,Output Port>> to the Root Process Group. The Output Port allows an 
+[[Site-to-Site_Output_Port]]
+*Output Port*: Similar to an Input Port, a DataFlow Manager may choose to add an <<output_port,Output Port>> to the Root Process Group. The Output Port allows an
 authorized NiFi instance to remotely connect to your instance and pull data from the Output Port. Configuring the Output Port will again allow the
 DFM to control how many concurrent tasks are allowed, as well as which NiFi instances are authorized to pull data from the instance being configured. 
 
 In addition to other instances of NiFi, some other applications may use a Site-to-Site client in order to push data to or receive data from a NiFi instance.
 For example, NiFi provides an Apache Storm spout and an Apache Spark Receiver that are able to pull data from NiFi's Root Group Output Ports.
 
-If your instance of NiFi is running securely, the first time that a client establishes a connection to your instance, the client will be forbidden and
+[[Site-to-Site_Access_Control]]
+*Access Control*: If your instance of NiFi is running securely, the first time that a client establishes a connection to your instance, the client will be forbidden and
 a request for an account for that client will automatically be generated. The client will need to be granted the 'NiFi' role in order to communicate
 via Site-to-Site. For more information on managing user accounts, see the
 link:administration-guide.html#controlling-levels-of-access[Controlling Levels of Access]
@@ -824,8 +849,6 @@ link:administration-guide.html#site_to_site_properties[Site-to-Site Properties]
 link:administration-guide.html[Admin Guide].
 
 
-
-
 === Example Dataflow
 
 This section has described the steps required to build a dataflow. Now, to put it all together. The following example dataflow 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerDTO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerDTO.java
index a47adcf..c433d49 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerDTO.java
@@ -41,6 +41,7 @@ public class ControllerDTO {
     private Integer outputPortCount;
 
     private Integer remoteSiteListeningPort;
+    private Integer remoteSiteHttpListeningPort;
     private Boolean siteToSiteSecure;
     private String instanceId;
     private Set<PortDTO> inputPorts;
@@ -150,6 +151,23 @@ public class ControllerDTO {
     }
 
     /**
+     * The HTTP(S) Port on which this instance is listening for Remote Transfers of Flow Files. If this instance is not configured to receive Flow Files from remote instances, this will be null.
+     *
+     * @return a integer between 1 and 65535, or null, if not configured for remote transfer
+     */
+    @ApiModelProperty(
+            value = "The HTTP(S) Port on which this instance is listening for Remote Transfers of Flow Files. If this instance is not configured to receive Flow Files from remote "
+                    + "instances, this will be null."
+    )
+    public Integer getRemoteSiteHttpListeningPort() {
+        return remoteSiteHttpListeningPort;
+    }
+
+    public void setRemoteSiteHttpListeningPort(Integer remoteSiteHttpListeningPort) {
+        this.remoteSiteHttpListeningPort = remoteSiteHttpListeningPort;
+    }
+
+    /**
      * @return Indicates whether or not Site-to-Site communications with this instance is secure (2-way authentication)
      */
     @ApiModelProperty(
@@ -274,4 +292,5 @@ public class ControllerDTO {
     public void setOutputPortCount(Integer outputPortCount) {
         this.outputPortCount = outputPortCount;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupDTO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupDTO.java
index b303d4d..340e9b8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupDTO.java
@@ -36,6 +36,11 @@ public class RemoteProcessGroupDTO extends ComponentDTO {
     private String comments;
     private String communicationsTimeout;
     private String yieldDuration;
+    private String transportProtocol;
+    private String proxyHost;
+    private Integer proxyPort;
+    private String proxyUser;
+    private String proxyPassword;
 
     private List<String> authorizationIssues;
     private Boolean transmitting;
@@ -288,4 +293,44 @@ public class RemoteProcessGroupDTO extends ComponentDTO {
         this.flowRefreshed = flowRefreshed;
     }
 
+    public String getTransportProtocol() {
+        return transportProtocol;
+    }
+
+    public void setTransportProtocol(String transportProtocol) {
+        this.transportProtocol = transportProtocol;
+    }
+
+
+    public String getProxyHost() {
+        return proxyHost;
+    }
+
+    public void setProxyHost(String proxyHost) {
+        this.proxyHost = proxyHost;
+    }
+
+    public Integer getProxyPort() {
+        return proxyPort;
+    }
+
+    public void setProxyPort(Integer proxyPort) {
+        this.proxyPort = proxyPort;
+    }
+
+    public String getProxyUser() {
+        return proxyUser;
+    }
+
+    public void setProxyUser(String proxyUser) {
+        this.proxyUser = proxyUser;
+    }
+
+    public String getProxyPassword() {
+        return proxyPassword;
+    }
+
+    public void setProxyPassword(String proxyPassword) {
+        this.proxyPassword = proxyPassword;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/remote/PeerDTO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/remote/PeerDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/remote/PeerDTO.java
new file mode 100644
index 0000000..7d76f17
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/remote/PeerDTO.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.dto.remote;
+
+import com.wordnik.swagger.annotations.ApiModelProperty;
+
+import javax.xml.bind.annotation.XmlType;
+
+/**
+ * Details of a Site-to-Site peer within this NiFi.
+ */
+@XmlType(name = "peer")
+public class PeerDTO {
+
+    private String hostname;
+    private int port;
+    private boolean secure;
+    private int flowFileCount;
+
+    @ApiModelProperty(
+            value = "The hostname of this peer."
+    )
+    public String getHostname() {
+        return hostname;
+    }
+
+    public void setHostname(String hostname) {
+        this.hostname = hostname;
+    }
+
+    @ApiModelProperty(
+            value = "The port number of this peer."
+    )
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    @ApiModelProperty(
+            value = "Returns if this peer connection is secure."
+    )
+    public boolean isSecure() {
+        return secure;
+    }
+
+    public void setSecure(boolean secure) {
+        this.secure = secure;
+    }
+
+
+    @ApiModelProperty(
+            value = "The number of flowFiles this peer holds."
+    )
+    public int getFlowFileCount() {
+        return flowFileCount;
+    }
+
+    public void setFlowFileCount(int flowFileCount) {
+        this.flowFileCount = flowFileCount;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PeersEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PeersEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PeersEntity.java
new file mode 100644
index 0000000..836cf38
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PeersEntity.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import org.apache.nifi.web.api.dto.remote.PeerDTO;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Collection;
+
+/**
+ * A serialized representation of this class can be placed in the entity body of a request or response to or from the API.
+ * This particular entity holds a reference to PeerDTOs.
+ */
+@XmlRootElement(name = "peersEntity")
+public class PeersEntity extends Entity {
+
+    private Collection<PeerDTO> peers;
+
+    /**
+     * The PeersDTO that is being serialized.
+     *
+     * @return The PeersDTO object
+     */
+    public Collection<PeerDTO> getPeers() {
+        return peers;
+    }
+
+    public void setPeers(Collection<PeerDTO> peers) {
+        this.peers = peers;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/TransactionResultEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/TransactionResultEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/TransactionResultEntity.java
new file mode 100644
index 0000000..de732cd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/TransactionResultEntity.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "transactionResultEntity")
+public class TransactionResultEntity extends Entity {
+
+    private int flowFileSent;
+
+    private int responseCode;
+
+    private String message;
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    public int getResponseCode() {
+        return responseCode;
+    }
+
+    public void setResponseCode(int responseCode) {
+        this.responseCode = responseCode;
+    }
+
+   public int getFlowFileSent() {
+        return flowFileSent;
+    }
+
+    public void setFlowFileSent(int flowFileSent) {
+        this.flowFileSent = flowFileSent;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
index 5aea3f1..e31832f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
@@ -40,6 +40,7 @@ public class ConnectionResponse {
     private final NodeIdentifier nodeIdentifier;
     private final DataFlow dataFlow;
     private final Integer managerRemoteInputPort;
+    private final Integer managerRemoteInputHttpPort;
     private final Boolean managerRemoteCommsSecure;
     private final String instanceId;
     private final List<NodeConnectionStatus> nodeStatuses;
@@ -48,7 +49,7 @@ public class ConnectionResponse {
     private volatile String coordinatorDN;
 
     public ConnectionResponse(final NodeIdentifier nodeIdentifier, final DataFlow dataFlow,
-        final Integer managerRemoteInputPort, final Boolean managerRemoteCommsSecure, final String instanceId,
+        final Integer managerRemoteInputPort, final Integer managerRemoteInputHttpPort, final Boolean managerRemoteCommsSecure, final String instanceId,
         final List<NodeConnectionStatus> nodeStatuses, final List<ComponentRevision> componentRevisions) {
 
         if (nodeIdentifier == null) {
@@ -61,6 +62,7 @@ public class ConnectionResponse {
         this.tryLaterSeconds = 0;
         this.rejectionReason = null;
         this.managerRemoteInputPort = managerRemoteInputPort;
+        this.managerRemoteInputHttpPort = managerRemoteInputHttpPort;
         this.managerRemoteCommsSecure = managerRemoteCommsSecure;
         this.instanceId = instanceId;
         this.nodeStatuses = Collections.unmodifiableList(new ArrayList<>(nodeStatuses));
@@ -76,6 +78,7 @@ public class ConnectionResponse {
         this.tryLaterSeconds = tryLaterSeconds;
         this.rejectionReason = null;
         this.managerRemoteInputPort = null;
+        this.managerRemoteInputHttpPort = null;
         this.managerRemoteCommsSecure = null;
         this.instanceId = null;
         this.nodeStatuses = null;
@@ -88,6 +91,7 @@ public class ConnectionResponse {
         this.tryLaterSeconds = 0;
         this.rejectionReason = rejectionReason;
         this.managerRemoteInputPort = null;
+        this.managerRemoteInputHttpPort = null;
         this.managerRemoteCommsSecure = null;
         this.instanceId = null;
         this.nodeStatuses = null;
@@ -130,6 +134,10 @@ public class ConnectionResponse {
         return managerRemoteInputPort;
     }
 
+    public Integer getManagerRemoteInputHttpPort() {
+        return managerRemoteInputHttpPort;
+    }
+
     public Boolean isManagerRemoteCommsSecure() {
         return managerRemoteCommsSecure;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
index 568fc20..f4475df 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
@@ -51,7 +51,8 @@ public class NodeIdentifier {
     private final String apiAddress;
 
     /**
-     * the port to use use for sending requests to the node's external interface
+     * the port to use use for sending requests to the node's external interface,
+     * this can be HTTP API port or HTTPS API port depending on whether //TODO: .
      */
     private final int apiPort;
 
@@ -72,24 +73,31 @@ public class NodeIdentifier {
     private final String siteToSiteAddress;
 
     /**
-     * the port that external clients should use to communicate with this node via Site-to-Site
+     * the port that external clients should use to communicate with this node via Site-to-Site RAW Socket protocol
      */
     private final Integer siteToSitePort;
 
     /**
+     * the port that external clients should use to communicate with this node via Site-to-Site HTTP protocol,
+     * this can be HTTP API port or HTTPS API port depending on whether siteToSiteSecure or not.
+     */
+    private final Integer siteToSiteHttpApiPort;
+
+    /**
      * whether or not site-to-site communications with this node are secure
      */
-    private Boolean siteToSiteSecure;
+    private final Boolean siteToSiteSecure;
+
 
     private final String nodeDn;
 
     public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort,
-        final String siteToSiteAddress, final Integer siteToSitePort, final boolean siteToSiteSecure) {
-        this(id, apiAddress, apiPort, socketAddress, socketPort, siteToSiteAddress, siteToSitePort, siteToSiteSecure, null);
+        final String siteToSiteAddress, final Integer siteToSitePort, final Integer siteToSiteHttpApiPort, final boolean siteToSiteSecure) {
+        this(id, apiAddress, apiPort, socketAddress, socketPort, siteToSiteAddress, siteToSitePort, siteToSiteHttpApiPort, siteToSiteSecure, null);
     }
 
     public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort,
-        final String siteToSiteAddress, final Integer siteToSitePort, final boolean siteToSiteSecure, final String dn) {
+        final String siteToSiteAddress, final Integer siteToSitePort, final Integer siteToSiteHttpApiPort, final boolean siteToSiteSecure, final String dn) {
 
         if (StringUtils.isBlank(id)) {
             throw new IllegalArgumentException("Node ID may not be empty or null.");
@@ -113,6 +121,7 @@ public class NodeIdentifier {
         this.nodeDn = dn;
         this.siteToSiteAddress = siteToSiteAddress == null ? apiAddress : siteToSiteAddress;
         this.siteToSitePort = siteToSitePort;
+        this.siteToSiteHttpApiPort = siteToSiteHttpApiPort;
         this.siteToSiteSecure = siteToSiteSecure;
     }
 
@@ -128,6 +137,7 @@ public class NodeIdentifier {
         this.nodeDn = null;
         this.siteToSiteAddress = null;
         this.siteToSitePort = null;
+        this.siteToSiteHttpApiPort = null;
         this.siteToSiteSecure = false;
     }
 
@@ -169,6 +179,10 @@ public class NodeIdentifier {
         return siteToSitePort;
     }
 
+    public Integer getSiteToSiteHttpApiPort() {
+        return siteToSiteHttpApiPort;
+    }
+
     public boolean isSiteToSiteSecure() {
         return siteToSiteSecure;
     }


Mime
View raw message