camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [2/6] camel git commit: CAMEL-10511: Updated MllpTcpClientProducer and MllpTcpServerConsumer to consume all available data on socket - backport to 2.17
Date Fri, 16 Dec 2016 21:59:11 GMT
http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketAcknowledgementWriterTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketAcknowledgementWriterTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketAcknowledgementWriterTest.java
new file mode 100644
index 0000000..9c218a7
--- /dev/null
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketAcknowledgementWriterTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp.impl;
+
+import org.apache.camel.component.mllp.MllpAcknowledgementDeliveryException;
+import org.apache.camel.test.util.PayloadBuilder;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_BLOCK;
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_DATA;
+import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK;
+import static org.apache.camel.component.mllp.impl.MllpSocketWriter.PAYLOAD_TERMINATOR;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class MllpSocketAcknowledgementWriterTest extends MllpSocketWriterTestSupport {
+    MllpSocketWriter mllpSocketWriter;
+
+    @Before
+    public void setUp() throws Exception {
+        mllpSocketWriter = new MllpSocketWriter(fakeSocket, true);
+    }
+
+    @Test
+    public void testWriteAcknowledgement() throws Exception {
+        byte[] expected = PayloadBuilder.build(START_OF_BLOCK, TEST_ACKNOWLEDGEMENT, END_OF_BLOCK, END_OF_DATA);
+
+        mllpSocketWriter.writeEnvelopedPayload(TEST_MESSAGE.getBytes(), TEST_ACKNOWLEDGEMENT.getBytes());
+
+        assertArrayEquals(expected, fakeSocket.payload());
+    }
+
+    @Test
+    public void testWriteNullAcknowledgement() throws Exception {
+        byte[] acknowledgement = null;
+
+        byte[] expected = PayloadBuilder.build(START_OF_BLOCK, END_OF_BLOCK, END_OF_DATA);
+
+        mllpSocketWriter.writeEnvelopedPayload(TEST_MESSAGE.getBytes(), acknowledgement);
+
+        assertArrayEquals(expected, fakeSocket.payload());
+    }
+
+    @Test
+    public void testWriteEmptyAcknowledgement() throws Exception {
+        byte[] acknowledgement = new byte[0];
+
+        byte[] expected = PayloadBuilder.build(START_OF_BLOCK, END_OF_BLOCK, END_OF_DATA);
+
+        mllpSocketWriter.writeEnvelopedPayload(TEST_MESSAGE.getBytes(), acknowledgement);
+
+        assertArrayEquals(expected, fakeSocket.payload());
+    }
+
+    @Test(expected = MllpAcknowledgementDeliveryException.class)
+    public void testGetOutputStreamFailure() throws Exception {
+        fakeSocket.fakeSocketOutputStream = null;
+
+        try {
+            mllpSocketWriter.writeEnvelopedPayload(TEST_MESSAGE.getBytes(), TEST_ACKNOWLEDGEMENT.getBytes());
+        } catch (MllpAcknowledgementDeliveryException expectedEx) {
+            verifyException(expectedEx);
+            throw expectedEx;
+        }
+    }
+
+    @Test(expected = MllpAcknowledgementDeliveryException.class)
+    public void testWriteToUnconnectedSocket() throws Exception {
+        fakeSocket.connected = false;
+
+        try {
+            mllpSocketWriter.writeEnvelopedPayload(TEST_MESSAGE.getBytes(), TEST_ACKNOWLEDGEMENT.getBytes());
+        } catch (MllpAcknowledgementDeliveryException expectedEx) {
+            verifyException(expectedEx);
+            throw expectedEx;
+        }
+    }
+
+    @Test(expected = MllpAcknowledgementDeliveryException.class)
+    public void testWriteToClosedSocket() throws Exception {
+        fakeSocket.closed = true;
+
+        try {
+            mllpSocketWriter.writeEnvelopedPayload(TEST_MESSAGE.getBytes(), TEST_ACKNOWLEDGEMENT.getBytes());
+        } catch (MllpAcknowledgementDeliveryException expectedEx) {
+            verifyException(expectedEx);
+            throw expectedEx;
+        }
+    }
+
+    @Test(expected = MllpAcknowledgementDeliveryException.class)
+    public void testWriteStartOfBlockFailure() throws Exception {
+        fakeSocket.fakeSocketOutputStream.writeFailOn = new Byte((byte) START_OF_BLOCK);
+
+        try {
+            mllpSocketWriter.writeEnvelopedPayload(TEST_MESSAGE.getBytes(), TEST_ACKNOWLEDGEMENT.getBytes());
+        } catch (MllpAcknowledgementDeliveryException expectedEx) {
+            verifyException(expectedEx);
+            throw expectedEx;
+        }
+    }
+
+    @Test(expected = MllpAcknowledgementDeliveryException.class)
+    public void testWriteAcknowledgementFailure() throws Exception {
+        fakeSocket.fakeSocketOutputStream.writeArrayFailOn = TEST_ACKNOWLEDGEMENT.getBytes();
+
+        try {
+            mllpSocketWriter.writeEnvelopedPayload(TEST_MESSAGE.getBytes(), TEST_ACKNOWLEDGEMENT.getBytes());
+        } catch (MllpAcknowledgementDeliveryException expectedEx) {
+            verifyException(expectedEx);
+            throw expectedEx;
+        }
+    }
+
+    @Test(expected = MllpAcknowledgementDeliveryException.class)
+    public void testWriteEndOfMessageFailure() throws Exception {
+        fakeSocket.fakeSocketOutputStream.writeArrayFailOn = PAYLOAD_TERMINATOR;
+
+        try {
+            mllpSocketWriter.writeEnvelopedPayload(TEST_MESSAGE.getBytes(), TEST_ACKNOWLEDGEMENT.getBytes());
+        } catch (MllpAcknowledgementDeliveryException expectedEx) {
+            verifyException(expectedEx);
+            throw expectedEx;
+        }
+    }
+
+    private void verifyException(MllpAcknowledgementDeliveryException expectedEx) throws Exception {
+        assertNotNull(expectedEx.getMessage());
+        assertArrayEquals(TEST_MESSAGE.getBytes(), expectedEx.getHl7Message());
+        assertArrayEquals(TEST_ACKNOWLEDGEMENT.getBytes(), expectedEx.getHl7Acknowledgement());
+        assertArrayEquals(TEST_ACKNOWLEDGEMENT.getBytes(), expectedEx.getMllpPayload());
+    }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketMessageReaderTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketMessageReaderTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketMessageReaderTest.java
new file mode 100644
index 0000000..0075c33
--- /dev/null
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketMessageReaderTest.java
@@ -0,0 +1,527 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp.impl;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+
+import org.apache.camel.component.mllp.MllpException;
+import org.apache.camel.component.mllp.MllpReceiveException;
+import org.apache.camel.component.mllp.MllpTimeoutException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.camel.component.mllp.MllpEndpoint.SEGMENT_DELIMITER;
+import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+public class MllpSocketMessageReaderTest extends MllpSocketReaderTestSupport {
+    MllpSocketReader mllpSocketReader;
+
+    @Before
+    public void setUp() throws Exception {
+        assertSocketOpen();
+        mllpSocketReader = new MllpSocketReader(fakeSocket, 5000, 1000, false);
+    }
+
+    @Test
+    public void testReadMessage() throws Exception {
+        byte[] expected = TEST_MESSAGE.getBytes();
+
+        fakeSocket.fakeSocketInputStream.addPacket(START_PACKET, expected, END_PACKET);
+
+        byte[] actual = mllpSocketReader.readEnvelopedPayload();
+
+        assertArrayEquals(expected, actual);
+        assertSocketOpen();
+    }
+
+    @Test
+    public void testReadMessageWithSeparateEnvelopingAndMessagePackets() throws Exception {
+        byte[] expected = TEST_MESSAGE.getBytes();
+
+        fakeSocket.fakeSocketInputStream.addPackets(START_PACKET, TEST_MESSAGE.getBytes(), END_PACKET);
+
+        byte[] actual = mllpSocketReader.readEnvelopedPayload();
+
+        assertArrayEquals(expected, actual);
+        assertSocketOpen();
+    }
+
+    @Test
+    public void testReadMessageWithMultipleMessagePackets() throws Exception {
+        byte[] expected = TEST_MESSAGE.getBytes();
+
+        fakeSocket.fakeSocketInputStream
+                .addPacket(START_PACKET)
+                .addPackets(TEST_MESSAGE, SEGMENT_DELIMITER)
+                .addPacket(END_PACKET);
+
+        byte[] actual = mllpSocketReader.readEnvelopedPayload();
+
+        assertArrayEquals(expected, actual);
+        assertSocketOpen();
+    }
+
+    @Test
+    public void testReadEmptyMessage() throws Exception {
+        byte[] expected = new byte[0];
+        fakeSocket.fakeSocketInputStream.addPacket(START_PACKET, END_PACKET);
+
+        byte[] actual = mllpSocketReader.readEnvelopedPayload();
+
+        assertArrayEquals(expected, actual);
+        assertSocketOpen();
+    }
+
+    @Test
+    public void testReadEmptyMessageWithSeparateEnvelopingPackets() throws Exception {
+        byte[] expected = new byte[0];
+        fakeSocket.fakeSocketInputStream.addPackets(START_PACKET, END_PACKET);
+
+        byte[] actual = mllpSocketReader.readEnvelopedPayload();
+
+        assertArrayEquals(expected, actual);
+        assertSocketOpen();
+    }
+
+    @Test()
+    public void testGetInputStreamFailure() throws Exception {
+        fakeSocket.fakeSocketInputStream = null;
+
+        try {
+            mllpSocketReader.readEnvelopedPayload();
+
+            expectedExceptionFailure(MllpReceiveException.class);
+        } catch (MllpReceiveException expectedEx) {
+            assertEmptyExpectedException(expectedEx);
+            assertNotNull(expectedEx.getCause());
+            assertThat(expectedEx.getCause(), instanceOf(IOException.class));
+            assertSocketReset();
+        }
+    }
+
+    @Test
+    public void testEndOfStreamOnInitialRead() throws Exception {
+        fakeSocket.fakeSocketInputStream.addPackets(EMPTY_PACKET, TEST_MESSAGE);
+
+        try {
+            mllpSocketReader.readEnvelopedPayload();
+
+            expectedExceptionFailure(MllpReceiveException.class);
+        } catch (MllpReceiveException expectedEx) {
+            assertEmptyExpectedException(expectedEx);
+            assertNull(expectedEx.getCause());
+            assertSocketReset();
+        }
+    }
+
+    @Test
+    public void testTimeoutOnInitialRead() throws Exception {
+        byte[] actual = mllpSocketReader.readEnvelopedPayload();
+
+        assertArrayEquals(null, actual);
+        assertSocketOpen();
+    }
+
+    @Test
+    public void testTimeoutOnInitialReadWithStartOfBlock() throws Exception {
+        fakeSocket.fakeSocketInputStream.addPacket(START_OF_BLOCK);
+
+        try {
+            mllpSocketReader.readEnvelopedPayload();
+
+            expectedExceptionFailure(MllpTimeoutException.class);
+        } catch (MllpTimeoutException expectedEx) {
+            assertEmptyExpectedException(expectedEx);
+            assertNotNull(expectedEx.getCause());
+            assertThat(expectedEx.getCause().getClass(), sameInstance((Class)SocketTimeoutException.class));
+            assertSocketReset();
+        }
+    }
+
+    @Test
+    public void testSocketExceptionOnInitialRead() throws Exception {
+        fakeSocket.fakeSocketInputStream.addPacket(EXCEPTION_PACKET);
+
+        try {
+            mllpSocketReader.readEnvelopedPayload();
+
+            expectedExceptionFailure(MllpReceiveException.class);
+        } catch (MllpReceiveException expectedEx) {
+            assertEmptyExpectedException(expectedEx);
+            assertNotNull(expectedEx.getCause());
+            assertThat(expectedEx.getCause().getClass(), sameInstance((Class)SocketException.class));
+            assertSocketReset();
+        }
+    }
+
+    @Test
+    public void testIOExceptionOnInitialRead() throws Exception {
+        fakeSocket.fakeSocketInputStream.useSocketExceptionOnNullPacket = false;
+        fakeSocket.fakeSocketInputStream.addPacket(EXCEPTION_PACKET);
+
+        try {
+            mllpSocketReader.readEnvelopedPayload();
+
+            expectedExceptionFailure(MllpReceiveException.class);
+        } catch (MllpReceiveException expectedEx) {
+            assertEmptyExpectedException(expectedEx);
+            assertNotNull(expectedEx.getCause());
+            assertThat(expectedEx.getCause().getClass(), sameInstance((Class)IOException.class));
+            assertSocketReset();
+        }
+    }
+
+    @Test
+    public void testEndOfStreamOnFirstAdditionalRead() throws Exception {
+        fakeSocket.fakeSocketInputStream.addPackets(START_PACKET, EMPTY_PACKET);
+
+        try {
+            mllpSocketReader.readEnvelopedPayload();
+
+            expectedExceptionFailure(MllpReceiveException.class);
+        } catch (MllpReceiveException expectedEx) {
+            assertEmptyExpectedException(expectedEx);
+            assertNull(expectedEx.getCause());
+            assertSocketReset();
+        }
+    }
+
+    @Test
+    public void testEndOfStreamOnFirstAdditionalReadWithPartialPayload() throws Exception {
+        fakeSocket.fakeSocketInputStream.addPacket(START_PACKET, TEST_MESSAGE.getBytes()).addPacket(EMPTY_PACKET);
+
+        try {
+            mllpSocketReader.readEnvelopedPayload();
+
+            expectedExceptionFailure(MllpReceiveException.class);
+        } catch (MllpReceiveException expectedEx) {
+            assertExpectedException(expectedEx);
+            assertNull(expectedEx.getCause());
+            assertSocketReset();
+        }
+    }
+
+    @Test
+    public void testTimeoutOnFirstAdditionalRead() throws Exception {
+        fakeSocket.fakeSocketInputStream.addPackets(START_PACKET);
+
+        try {
+            mllpSocketReader.readEnvelopedPayload();
+
+            expectedExceptionFailure(MllpTimeoutException.class);
+        } catch (MllpTimeoutException expectedEx) {
+            assertEmptyExpectedException(expectedEx);
+            assertNotNull(expectedEx.getCause());
+            assertThat(expectedEx.getCause().getClass(), sameInstance((Class)SocketTimeoutException.class));
+            assertSocketReset();
+        }
+    }
+
+    @Test
+    public void testTimeoutOnFirstAdditionalReadWithPartialPayload() throws Exception {
+        fakeSocket.fakeSocketInputStream.addPacket(START_PACKET, TEST_MESSAGE.getBytes());
+
+        try {
+            mllpSocketReader.readEnvelopedPayload();
+
+            expectedExceptionFailure(MllpTimeoutException.class);
+        } catch (MllpTimeoutException expectedEx) {
+            assertExpectedException(expectedEx);
+            assertNotNull(expectedEx.getCause());
+            assertThat(expectedEx.getCause().getClass(), sameInstance((Class)SocketTimeoutException.class));
+            assertSocketReset();
+        }
+    }
+
+    @Test
+    public void testSocketExceptionOnFirstAdditionalRead() throws Exception {
+        fakeSocket.fakeSocketInputStream.addPackets(START_PACKET, EXCEPTION_PACKET);
+
+        try {
+            mllpSocketReader.readEnvelopedPayload();
+
+            expectedExceptionFailure(MllpReceiveException.class);
+        } catch (MllpReceiveException expectedEx) {
+            assertEmptyExpectedException(expectedEx);
+            assertNotNull(expectedEx.getCause());
+            assertThat(expectedEx.getCause().getClass(), sameInstance((Class)SocketException.class));
+            assertSocketReset();
+        }
+    }
+
+    @Test
+    public void testSocketExceptionOnFirstAdditionalReadWithPartialPayload() throws Exception {
+        fakeSocket.fakeSocketInputStream.addPacket(START_PACKET, TEST_MESSAGE.getBytes()).addPacket(EXCEPTION_PACKET);
+
+        try {
+            mllpSocketReader.readEnvelopedPayload();
+
+            expectedExceptionFailure(MllpReceiveException.class);
+        } catch (MllpReceiveException expectedEx) {
+            assertExpectedException(expectedEx);
+            assertNotNull(expectedEx.getCause());
+            assertThat(expectedEx.getCause().getClass(), sameInstance((Class)SocketException.class));
+            assertSocketReset();
+        }
+    }
+
+    @Test
+    public void testIOExceptionOnFirstAdditionalRead() throws Exception {
+        fakeSocket.fakeSocketInputStream.useSocketExceptionOnNullPacket = false;
+        fakeSocket.fakeSocketInputStream.addPackets(START_PACKET, EXCEPTION_PACKET);
+
+        try {
+            mllpSocketReader.readEnvelopedPayload();
+
+            expectedExceptionFailure(MllpReceiveException.class);
+        } catch (MllpReceiveException expectedEx) {
+            assertEmptyExpectedException(expectedEx);
+            assertNotNull(expectedEx.getCause());
+            assertThat(expectedEx.getCause().getClass(), sameInstance((Class)IOException.class));
+            assertSocketReset();
+        }
+    }
+
+    @Test
+    public void testIOExceptionOnFirstAdditionalReadWithPartialPayload() throws Exception {
+        fakeSocket.fakeSocketInputStream.useSocketExceptionOnNullPacket = false;
+        fakeSocket.fakeSocketInputStream.addPacket(START_PACKET, TEST_MESSAGE.getBytes()).addPacket(EXCEPTION_PACKET);
+
+        try {
+            mllpSocketReader.readEnvelopedPayload();
+
+            expectedExceptionFailure(MllpReceiveException.class);
+        } catch (MllpReceiveException expectedEx) {
+            assertExpectedException(expectedEx);
+            assertNotNull(expectedEx.getCause());
+            assertThat(expectedEx.getCause().getClass(), sameInstance((Class)IOException.class));
+            assertSocketReset();
+        }
+    }
+
+    @Test
+    public void testEndOfStreamOnSecondAdditionalReadWithPartialPayload() throws Exception {
+        fakeSocket.fakeSocketInputStream.addPackets(START_PACKET, TEST_MESSAGE.getBytes(), EMPTY_PACKET);
+
+        try {
+            mllpSocketReader.readEnvelopedPayload();
+
+            expectedExceptionFailure(MllpReceiveException.class);
+        } catch (MllpReceiveException expectedEx) {
+            assertExpectedException(expectedEx);
+            assertNull(expectedEx.getCause());
+            assertSocketReset();
+        }
+    }
+
+    @Test
+    public void testTimeoutOnSecondAdditionalReadWithPartialPayload() throws Exception {
+        fakeSocket.fakeSocketInputStream.addPackets(START_PACKET, TEST_MESSAGE.getBytes());
+
+        try {
+            mllpSocketReader.readEnvelopedPayload();
+
+            expectedExceptionFailure(MllpTimeoutException.class);
+        } catch (MllpTimeoutException expectedEx) {
+            assertExpectedException(expectedEx);
+            assertNotNull(expectedEx.getCause());
+            assertThat(expectedEx.getCause().getClass(), sameInstance((Class)SocketTimeoutException.class));
+            assertSocketReset();
+        }
+    }
+
+    @Test
+    public void testSocketExceptionOnSecondAdditionalReadWithPartialPayload() throws Exception {
+        fakeSocket.fakeSocketInputStream.addPackets(START_PACKET, TEST_MESSAGE.getBytes(), EXCEPTION_PACKET);
+
+        try {
+            mllpSocketReader.readEnvelopedPayload();
+
+            expectedExceptionFailure(MllpReceiveException.class);
+        } catch (MllpReceiveException expectedEx) {
+            assertExpectedException(expectedEx);
+            assertNotNull(expectedEx.getCause());
+            assertThat(expectedEx.getCause().getClass(), sameInstance((Class)SocketException.class));
+            assertSocketReset();
+        }
+    }
+
+    @Test
+    public void testIOExceptionOnSecondAdditionalReadWithPartialPayload() throws Exception {
+        fakeSocket.fakeSocketInputStream.useSocketExceptionOnNullPacket = false;
+        fakeSocket.fakeSocketInputStream.addPackets(START_PACKET, TEST_MESSAGE.getBytes(), EXCEPTION_PACKET);
+
+        try {
+            mllpSocketReader.readEnvelopedPayload();
+
+            expectedExceptionFailure(MllpReceiveException.class);
+        } catch (MllpReceiveException expectedEx) {
+            assertExpectedException(expectedEx);
+            assertNotNull(expectedEx.getCause());
+            assertThat(expectedEx.getCause().getClass(), sameInstance((Class)IOException.class));
+            assertSocketReset();
+        }
+    }
+
+    @Test
+    public void testLeadingOutOfBandBytes() throws Exception {
+        byte[] expected = TEST_MESSAGE.getBytes();
+
+        fakeSocket.fakeSocketInputStream.addPacket("Junk".getBytes(), START_PACKET, expected, END_PACKET);
+
+        byte[] actual = mllpSocketReader.readEnvelopedPayload();
+
+        assertArrayEquals(expected, actual);
+    }
+
+    @Test
+    public void testLeadingOutOfBandBytesWithEmptyMessage() throws Exception {
+        byte[] expected = new byte[0];
+        fakeSocket.fakeSocketInputStream.addPacket("Junk".getBytes(), START_PACKET, END_PACKET);
+
+        byte[] actual = mllpSocketReader.readEnvelopedPayload();
+
+        assertArrayEquals(expected, actual);
+        assertSocketOpen();
+    }
+
+    @Test
+    public void testLeadingOutOfBandBytesWithEmptyMessageWithSeparateEnvelopingPackets() throws Exception {
+        byte[] expected = new byte[0];
+        fakeSocket.fakeSocketInputStream.addPackets("Junk".getBytes(), START_PACKET, END_PACKET);
+
+        byte[] actual = mllpSocketReader.readEnvelopedPayload();
+
+        assertArrayEquals(expected, actual);
+        assertSocketOpen();
+    }
+
+    @Test
+    public void testLeadingOutOfBandBytesSeparateEnvelopingAndMessagePackets() throws Exception {
+        byte[] expected = TEST_MESSAGE.getBytes();
+
+        fakeSocket.fakeSocketInputStream.addPackets("Junk".getBytes(), START_PACKET, TEST_MESSAGE.getBytes(), END_PACKET);
+
+        byte[] actual = mllpSocketReader.readEnvelopedPayload();
+
+        assertArrayEquals(expected, actual);
+        assertSocketOpen();
+    }
+
+    @Test
+    public void testLeadingOutOfBandBytesWithMultipleMessagePackets() throws Exception {
+        byte[] expected = TEST_MESSAGE.getBytes();
+
+        fakeSocket.fakeSocketInputStream
+                .addPacket("Junk")
+                .addPacket(START_PACKET)
+                .addPackets(TEST_MESSAGE, SEGMENT_DELIMITER)
+                .addPacket(END_PACKET);
+
+        byte[] actual = mllpSocketReader.readEnvelopedPayload();
+
+        assertArrayEquals(expected, actual);
+        assertSocketOpen();
+    }
+
+    @Test
+    public void testTrailingOutOfBandBytes() throws Exception {
+        byte[] expected = TEST_MESSAGE.getBytes();
+
+        fakeSocket.fakeSocketInputStream.addPacket(START_PACKET, expected, END_PACKET, "Junk".getBytes());
+
+        byte[] actual = mllpSocketReader.readEnvelopedPayload();
+
+        assertArrayEquals(expected, actual);
+    }
+
+    @Test
+    public void testTrailingOutOfBandBytesWithEmptyMessage() throws Exception {
+        byte[] expected = new byte[0];
+        fakeSocket.fakeSocketInputStream.addPacket(START_PACKET, END_PACKET, "Junk".getBytes());
+
+        byte[] actual = mllpSocketReader.readEnvelopedPayload();
+
+        assertArrayEquals(expected, actual);
+        assertSocketOpen();
+    }
+
+    @Test
+    public void testTrailingOutOfBandBytesWithEmptyMessageWithSeparateEnvelopingPackets() throws Exception {
+        byte[] expected = new byte[0];
+        fakeSocket.fakeSocketInputStream.addPackets(START_PACKET, END_PACKET, "Junk".getBytes());
+
+        byte[] actual = mllpSocketReader.readEnvelopedPayload();
+
+        assertArrayEquals(expected, actual);
+        assertSocketOpen();
+    }
+
+    @Test
+    public void testTrailingOutOfBandBytesSeparateEnvelopingAndMessagePackets() throws Exception {
+        byte[] expected = TEST_MESSAGE.getBytes();
+
+        fakeSocket.fakeSocketInputStream.addPackets(START_PACKET, TEST_MESSAGE.getBytes(), END_PACKET, "Junk".getBytes());
+
+        byte[] actual = mllpSocketReader.readEnvelopedPayload();
+
+        assertArrayEquals(expected, actual);
+        assertSocketOpen();
+    }
+
+    @Test
+    public void testTrailingOutOfBandBytesWithMultipleMessagePackets() throws Exception {
+        byte[] expected = TEST_MESSAGE.getBytes();
+
+        fakeSocket.fakeSocketInputStream
+                .addPacket(START_PACKET)
+                .addPackets(TEST_MESSAGE, SEGMENT_DELIMITER)
+                .addPacket(END_PACKET)
+                .addPacket("Junk");
+
+        byte[] actual = mllpSocketReader.readEnvelopedPayload();
+
+        assertArrayEquals(expected, actual);
+        assertSocketOpen();
+    }
+
+    private void assertEmptyExpectedException(MllpException expectedEx) {
+        assertNotNull(expectedEx);
+        assertNotNull(expectedEx.getMessage());
+        assertNull(expectedEx.getHl7Message());
+        assertNull(expectedEx.getHl7Acknowledgement());
+        assertNull(expectedEx.getMllpPayload());
+    }
+
+    private void assertExpectedException(MllpException expectedEx) {
+        assertNotNull(expectedEx);
+        assertNotNull(expectedEx.getMessage());
+        assertArrayEquals(TEST_MESSAGE.getBytes(), expectedEx.getHl7Message());
+        assertNull(expectedEx.getHl7Acknowledgement());
+        assertArrayEquals(TEST_MESSAGE.getBytes(), expectedEx.getMllpPayload());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketMessageWriterTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketMessageWriterTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketMessageWriterTest.java
new file mode 100644
index 0000000..8e02bf9
--- /dev/null
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketMessageWriterTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp.impl;
+
+import org.apache.camel.component.mllp.MllpWriteException;
+import org.apache.camel.test.util.PayloadBuilder;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_BLOCK;
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_DATA;
+import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK;
+import static org.apache.camel.component.mllp.impl.MllpSocketWriter.PAYLOAD_TERMINATOR;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class MllpSocketMessageWriterTest extends MllpSocketWriterTestSupport {
+    MllpSocketWriter mllpSocketWriter;
+
+    @Before
+    public void setUp() throws Exception {
+        mllpSocketWriter = new MllpSocketWriter(fakeSocket, false);
+    }
+
+    @Test
+    public void testWriteMessage() throws Exception {
+        byte[] expected = PayloadBuilder.build(START_OF_BLOCK, TEST_MESSAGE, END_OF_BLOCK, END_OF_DATA);
+
+        mllpSocketWriter.writeEnvelopedPayload(TEST_MESSAGE.getBytes(), null);
+
+        assertArrayEquals(expected, fakeSocket.payload());
+    }
+
+    @Test
+    public void testWriteNullMessage() throws Exception {
+        byte[] message = null;
+
+        byte[] expected = PayloadBuilder.build(START_OF_BLOCK, END_OF_BLOCK, END_OF_DATA);
+
+        mllpSocketWriter.writeEnvelopedPayload(message, null);
+
+        assertArrayEquals(expected, fakeSocket.payload());
+    }
+
+    @Test
+    public void testWriteEmptyMessage() throws Exception {
+        byte[] message = new byte[0];
+
+        byte[] expected = PayloadBuilder.build(START_OF_BLOCK, END_OF_BLOCK, END_OF_DATA);
+
+        mllpSocketWriter.writeEnvelopedPayload(message, null);
+
+        assertArrayEquals(expected, fakeSocket.payload());
+    }
+
+    @Test(expected = MllpWriteException.class)
+    public void testGetOutputStreamFailure() throws Exception {
+        fakeSocket.fakeSocketOutputStream = null;
+
+        try {
+            mllpSocketWriter.writeEnvelopedPayload(TEST_MESSAGE.getBytes(), null);
+        } catch (MllpWriteException expectedEx) {
+            verifyException(expectedEx);
+            throw expectedEx;
+        }
+    }
+
+    @Test(expected = MllpWriteException.class)
+    public void testWriteToUnconnectedSocket() throws Exception {
+        fakeSocket.connected = false;
+
+        try {
+            mllpSocketWriter.writeEnvelopedPayload(TEST_MESSAGE.getBytes(), null);
+        } catch (MllpWriteException expectedEx) {
+            verifyException(expectedEx);
+            throw expectedEx;
+        }
+    }
+
+    @Test(expected = MllpWriteException.class)
+    public void testWriteToClosedSocket() throws Exception {
+        fakeSocket.closed = true;
+
+        try {
+            mllpSocketWriter.writeEnvelopedPayload(TEST_MESSAGE.getBytes(), null);
+        } catch (MllpWriteException expectedEx) {
+            verifyException(expectedEx);
+            throw expectedEx;
+        }
+    }
+
+    @Test(expected = MllpWriteException.class)
+    public void testWriteStartOfBlockFailure() throws Exception {
+        fakeSocket.fakeSocketOutputStream.writeFailOn = new Byte((byte) START_OF_BLOCK);
+
+        try {
+            mllpSocketWriter.writeEnvelopedPayload(TEST_MESSAGE.getBytes(), null);
+        } catch (MllpWriteException expectedEx) {
+            verifyException(expectedEx);
+            throw expectedEx;
+        }
+    }
+
+    @Test(expected = MllpWriteException.class)
+    public void testWriteMessageFailure() throws Exception {
+        fakeSocket.fakeSocketOutputStream.writeArrayFailOn = TEST_MESSAGE.getBytes();
+
+        try {
+            mllpSocketWriter.writeEnvelopedPayload(TEST_MESSAGE.getBytes(), null);
+        } catch (MllpWriteException expectedEx) {
+            verifyException(expectedEx);
+            throw expectedEx;
+        }
+    }
+
+    @Test(expected = MllpWriteException.class)
+    public void testWriteEndOfMessageFailure() throws Exception {
+        fakeSocket.fakeSocketOutputStream.writeArrayFailOn = PAYLOAD_TERMINATOR;
+
+        try {
+            mllpSocketWriter.writeEnvelopedPayload(TEST_MESSAGE.getBytes(), null);
+        } catch (MllpWriteException expectedEx) {
+            verifyException(expectedEx);
+            throw expectedEx;
+        }
+    }
+
+    private void verifyException(MllpWriteException expectedEx) throws Exception {
+        assertNotNull(expectedEx.getMessage());
+        assertArrayEquals(TEST_MESSAGE.getBytes(), expectedEx.getHl7Message());
+        assertNull(expectedEx.getHl7Acknowledgement());
+        assertArrayEquals(TEST_MESSAGE.getBytes(), expectedEx.getMllpPayload());
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketReaderTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketReaderTestSupport.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketReaderTestSupport.java
new file mode 100644
index 0000000..11a4cfe
--- /dev/null
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketReaderTestSupport.java
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.StringTokenizer;
+
+import org.apache.camel.test.util.PayloadBuilder;
+
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_BLOCK;
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_DATA;
+import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public abstract class MllpSocketReaderTestSupport {
+    static final String TEST_MESSAGE =
+        "MSH|^~\\&|ADT|EPIC|JCAPS|CC|20161206193919|RISTECH|ADT^A08|00001|D|2.3^^|||||||" + '\r'
+            + "EVN|A08|20150107161440||REG_UPDATE_SEND_VISIT_MESSAGES_ON_PATIENT_CHANGES|RISTECH^RADIOLOGY^TECHNOLOGIST^^^^^^UCLA^^^^^RRMC||" + '\r'
+            + "PID|1|2100355^^^MRN^MRN|2100355^^^MRN^MRN||MDCLS9^MC9||19700109|F||U|111 HOVER STREET^^LOS ANGELES^CA^90032^USA^P^^LOS ANGELE|LOS ANGELE|"
+                + "(310)725-6952^P^PH^^^310^7256952||ENGLISH|U||60000013647|565-33-2222|||U||||||||N||" + '\r'
+            + "PD1|||UCLA HEALTH SYSTEM^^10|10002116^ADAMS^JOHN^D^^^^^EPIC^^^^PROVID||||||||||||||" + '\r'
+            + "NK1|1|DOE^MC9^^|OTH|^^^^^USA|(310)888-9999^^^^^310^8889999|(310)999-2222^^^^^310^9992222|Emergency Contact 1|||||||||||||||||||||||||||" + '\r'
+            + "PV1|1|OUTPATIENT|RR CT^^^1000^^^^^^^DEPID|EL|||017511^TOBIAS^JONATHAN^^^^^^EPIC^^^^PROVID|017511^TOBIAS^JONATHAN^^^^^^EPIC^^^^PROVID||||||"
+                + "CLR|||||60000013647|SELF|||||||||||||||||||||HOV_CONF|^^^1000^^^^^^^||20150107161438||||||||||" + '\r'
+            + "PV2||||||||20150107161438||||CT BRAIN W WO CONTRAST||||||||||N|||||||||||||||||||||||||||" + '\r'
+            + "ZPV||||||||||||20150107161438|||||||||" + '\r'
+            + "AL1|1||33361^NO KNOWN ALLERGIES^^NOTCOMPUTRITION^NO KNOWN ALLERGIES^EXTELG||||||" + '\r'
+            + "DG1|1|DX|784.0^Headache^DX|Headache||VISIT" + '\r'
+            + "GT1|1|1000235129|MDCLS9^MC9^^||111 HOVER STREET^^LOS ANGELES^CA^90032^USA^^^LOS ANGELE|(310)725-6952^^^^^310^7256952||19700109|F|P/F|SLF|"
+                + "565-33-2222|||||^^^^^USA|||UNKNOWN|||||||||||||||||||||||||||||" + '\r'
+            + "UB2||||||||" + '\r'
+            + '\n';
+
+    static final String TEST_ACKNOWLEDGEMENT =
+        "MSH|^~\\&|JCAPS|CC|ADT|EPIC|20161206193919|RISTECH|ACK^A08|00001|D|2.3^^|||||||" + '\r'
+            + "MSA|AA|00001|" + '\r'
+            + '\n';
+
+    static final byte[] EXCEPTION_PACKET = null;
+    static final byte[] EMPTY_PACKET = new byte[0];
+    static final byte[] START_PACKET = PayloadBuilder.build(START_OF_BLOCK);
+    static final byte[] END_PACKET = PayloadBuilder.build(END_OF_BLOCK, END_OF_DATA);
+
+    FakeSocket fakeSocket = new FakeSocket();
+
+    void assertSocketOpen() throws Exception {
+        assertTrue("socket should have been connected", fakeSocket.connected);
+        assertFalse("shutdownInput() should not have been called", fakeSocket.inputShutdown);
+        assertFalse("shutdownOutput() should not have been called", fakeSocket.outputShutdown);
+        assertFalse("close() should not have been called", fakeSocket.closed);
+        assertNotNull("socket should have an input stream", fakeSocket.fakeSocketInputStream);
+    }
+
+    void assertSocketClosed() throws Exception {
+        assertTrue("socket should have been connected", fakeSocket.connected);
+        assertTrue("shutdownInput() should have been called", fakeSocket.inputShutdown);
+        assertTrue("shutdownOutput() should have been called", fakeSocket.outputShutdown);
+        assertTrue("close() should have been called", fakeSocket.closed);
+        assertFalse("SO_LINGER should not be enabled", fakeSocket.linger);
+    }
+
+    void assertSocketReset() throws Exception {
+        assertTrue("socket should have been connected", fakeSocket.connected);
+        assertTrue("close() should have been called", fakeSocket.closed);
+        assertTrue("SO_LINGER should be enabled", fakeSocket.linger);
+        assertEquals("SO_LINGER timeout should be 0", 0, fakeSocket.lingerTimeout);
+    }
+
+    <E extends Exception> void expectedExceptionFailure(Class<E> expected) throws Exception {
+        fail("Expected exception " + expected.getName() + " was not thrown");
+    }
+
+    class FakeSocket extends Socket {
+        boolean connected = true;
+        boolean inputShutdown;
+        boolean outputShutdown;
+        boolean closed;
+        int receiveBufferSize = 1024;
+        int sendBufferSize = 1024;
+        int timeout = 1000;
+        boolean linger;
+        int lingerTimeout = 1024;
+        FakeSocketInputStream fakeSocketInputStream = new FakeSocketInputStream();
+
+        FakeSocket() {
+        }
+
+        @Override
+        public boolean isConnected() {
+            return connected;
+        }
+
+        @Override
+        public boolean isInputShutdown() {
+            return inputShutdown;
+        }
+
+        @Override
+        public boolean isOutputShutdown() {
+            return outputShutdown;
+        }
+
+        @Override
+        public boolean isClosed() {
+            return closed;
+        }
+
+        @Override
+        public void shutdownInput() throws IOException {
+            inputShutdown = true;
+        }
+
+        @Override
+        public void shutdownOutput() throws IOException {
+            outputShutdown = true;
+        }
+
+        @Override
+        public synchronized void close() throws IOException {
+            closed = true;
+        }
+
+        @Override
+        public int getSoLinger() throws SocketException {
+            if (linger) {
+                return lingerTimeout;
+            }
+
+            return -1;
+        }
+
+        @Override
+        public void setSoLinger(boolean on, int linger) throws SocketException {
+            this.linger = on;
+            this.lingerTimeout = linger;
+        }
+
+        @Override
+        public synchronized int getReceiveBufferSize() throws SocketException {
+            return receiveBufferSize;
+        }
+
+        @Override
+        public synchronized void setReceiveBufferSize(int size) throws SocketException {
+            this.receiveBufferSize = size;
+        }
+
+        @Override
+        public synchronized int getSendBufferSize() throws SocketException {
+            return sendBufferSize;
+        }
+
+        @Override
+        public synchronized void setSendBufferSize(int size) throws SocketException {
+            this.sendBufferSize = size;
+        }
+
+        @Override
+        public synchronized int getSoTimeout() throws SocketException {
+            return timeout;
+        }
+
+        @Override
+        public synchronized void setSoTimeout(int timeout) throws SocketException {
+            this.timeout = timeout;
+        }
+
+        @Override
+        public InputStream getInputStream() throws IOException {
+            if (fakeSocketInputStream == null) {
+                throw new IOException("Faking getInputStream failure");
+            }
+            return fakeSocketInputStream;
+        }
+
+    }
+
+    class FakeSocketInputStream extends InputStream {
+        boolean useSocketExceptionOnNullPacket = true;
+        private Queue<ByteArrayInputStream> packetQueue = new LinkedList<>();
+
+        FakeSocketInputStream() {
+        }
+
+        @Override
+        public int read() throws IOException {
+            if (packetQueue.size() > 0) {
+                if (packetQueue.peek() == null) {
+                    if (useSocketExceptionOnNullPacket) {
+                        throw new SocketException("Faking Socket read() failure - simulating reset");
+                    } else {
+                        throw new IOException("Faking Socket read() failure");
+                    }
+                }
+                int answer = packetQueue.element().read();
+                if (answer == -1 || packetQueue.element().available() == 0) {
+                    packetQueue.remove();
+                }
+                return answer;
+            }
+
+            throw new SocketTimeoutException("Faking Socket read() Timeout");
+        }
+
+        @Override
+        public int read(byte[] buffer) throws IOException {
+            if (packetQueue.size() > 0) {
+                if (packetQueue.peek() == null) {
+                    if (useSocketExceptionOnNullPacket) {
+                        throw new SocketException("Faking Socket read(byte[]) failure - simulating reset");
+                    } else {
+                        throw new IOException("Faking Socket read(byte[]) failure");
+                    }
+                }
+                int answer = packetQueue.element().read(buffer);
+                if (answer == -1 || packetQueue.element().available() == 0) {
+                    packetQueue.remove();
+                }
+                return answer;
+            }
+
+            throw new SocketTimeoutException("Faking Socket read(byte[]) Timeout");
+        }
+
+        @Override
+        public int read(byte[] buffer, int offset, int length) throws IOException {
+            if (packetQueue.size() > 0) {
+                if (packetQueue.peek() == null) {
+                    if (useSocketExceptionOnNullPacket) {
+                        throw new SocketException("Faking Socket read(byte[], int, int) failure - simulating reset");
+                    } else {
+                        throw new IOException("Faking Socket read(byte[], int, int) failure");
+                    }
+                }
+                int answer = packetQueue.element().read(buffer, offset, length);
+                if (answer == -1 || packetQueue.element().available() == 0) {
+                    packetQueue.remove();
+                }
+
+                return answer;
+            }
+
+            throw new SocketTimeoutException("Faking Socket read(byte[], int, int) Timeout");
+        }
+
+        @Override
+        public int available() throws IOException {
+            if (packetQueue.size() > 0) {
+                return packetQueue.element().available();
+            }
+
+            return 0;
+        }
+
+        public FakeSocketInputStream addPacket(char... packet) {
+            this.packetQueue.add(new ByteArrayInputStream(PayloadBuilder.build(packet)));
+
+            return this;
+        }
+
+        public FakeSocketInputStream addPacket(byte[] bytes) throws IOException {
+            if (bytes != null) {
+                this.packetQueue.add(new ByteArrayInputStream(bytes));
+            } else {
+                this.packetQueue.add(null);
+            }
+
+            return this;
+        }
+
+        public FakeSocketInputStream addPacket(byte[] bytes, byte[]... byteArrays) throws IOException {
+            PayloadBuilder builder = new PayloadBuilder(bytes);
+            for (byte[] additionalBytes : byteArrays) {
+                builder.append(additionalBytes);
+            }
+            this.packetQueue.add(new ByteArrayInputStream(builder.build()));
+
+            return this;
+        }
+
+        public FakeSocketInputStream addPacket(String... strings) throws IOException {
+            this.packetQueue.add(new ByteArrayInputStream(PayloadBuilder.build(strings)));
+
+            return this;
+        }
+
+        public FakeSocketInputStream addPackets(String message, char delimiter) throws IOException {
+            StringTokenizer tokenizer = new StringTokenizer(message, String.valueOf(delimiter), true);
+            while (tokenizer.hasMoreTokens()) {
+                addPacket(tokenizer.nextToken());
+            }
+
+            return this;
+        }
+
+        public FakeSocketInputStream addPackets(char... packets) {
+            for (char c : packets) {
+                addPacket(c);
+            }
+
+            return this;
+        }
+
+        public FakeSocketInputStream addPackets(byte[]... packets) throws IOException {
+            for (byte[] packet : packets) {
+                addPacket(packet);
+            }
+
+            return this;
+        }
+
+        public FakeSocketInputStream addPackets(byte[] bytes, String s) throws IOException {
+            return addPacket(bytes).addPacket(s);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketUtilExceptionHandlingTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketUtilExceptionHandlingTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketUtilExceptionHandlingTest.java
new file mode 100644
index 0000000..543b028
--- /dev/null
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketUtilExceptionHandlingTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp.impl;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertTrue;
+
+public class MllpSocketUtilExceptionHandlingTest {
+    Logger logger = LoggerFactory.getLogger(this.getClass());
+    Socket socket;
+
+    @Before
+    public void setUp() throws Exception {
+        socket = new FakeSocket();
+    }
+
+    @Test
+    public void testClose() throws Exception {
+        MllpSocketUtil.close(socket, null, null);
+
+        assertTrue("Socket should be closed", socket.isClosed());
+    }
+
+    @Test
+    public void testCloseWithLogger() throws Exception {
+        MllpSocketUtil.close(socket, logger, null);
+
+        assertTrue("Socket should be closed", socket.isClosed());
+    }
+
+    @Test
+    public void testCloseWithLoggerAndReason() throws Exception {
+        MllpSocketUtil.close(socket, logger, "Testing " + this.getClass().getSimpleName() + ".close(...)");
+
+        assertTrue("Socket should be closed", socket.isClosed());
+    }
+
+
+    @Test
+    public void testReset() throws Exception {
+        MllpSocketUtil.reset(socket, null, null);
+
+        assertTrue("Socket should be closed", socket.isClosed());
+    }
+
+    @Test
+    public void testResetWithLogger() throws Exception {
+        MllpSocketUtil.reset(socket, logger, null);
+
+        assertTrue("Socket should be closed", socket.isClosed());
+    }
+
+    @Test
+    public void testResetWithLoggerAndReason() throws Exception {
+        MllpSocketUtil.reset(socket, logger, "Testing " + this.getClass().getSimpleName() + ".reset(...)");
+
+        assertTrue("Socket should be closed", socket.isClosed());
+    }
+
+
+    // Utility Methods
+
+    class FakeSocket extends Socket {
+        boolean connected = true;
+        boolean closed;
+
+        FakeSocket() {
+        }
+
+        @Override
+        public boolean isInputShutdown() {
+            return false;
+        }
+
+        @Override
+        public boolean isOutputShutdown() {
+            return false;
+        }
+
+        @Override
+        public void setSoLinger(boolean on, int linger) throws SocketException {
+            throw new SocketException("Faking a setSoLinger failure");
+        }
+
+        @Override
+        public void shutdownInput() throws IOException {
+            throw new IOException("Faking a shutdownInput failure");
+        }
+
+        @Override
+        public void shutdownOutput() throws IOException {
+            throw new IOException("Faking a shutdownOutput failure");
+        }
+
+        @Override
+        public boolean isConnected() {
+            return connected;
+        }
+
+        @Override
+        public boolean isClosed() {
+            return closed;
+        }
+
+        @Override
+        public synchronized void close() throws IOException {
+            closed = true;
+            throw new IOException("Faking a close failure");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketUtilFindXxxOfBlockTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketUtilFindXxxOfBlockTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketUtilFindXxxOfBlockTest.java
new file mode 100644
index 0000000..31704a5
--- /dev/null
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketUtilFindXxxOfBlockTest.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp.impl;
+
+import org.apache.camel.test.util.PayloadBuilder;
+import org.junit.Test;
+
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_BLOCK;
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_DATA;
+import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK;
+import static org.junit.Assert.assertEquals;
+
+public class MllpSocketUtilFindXxxOfBlockTest {
+    static final String HL7_PAYLOAD_STRING =
+            "MSH|^~\\&|JCAPS|CC|ADT|EPIC|20161206193919|RISTECH|ACK^A08|00001|D|2.3^^|||||||" + '\r'
+                    + "MSA|AA|00001|" + '\r'
+                    + '\n';
+
+    @Test
+    public void testFindStartOfBlockWithDummyPayload() throws Exception {
+        byte[] payload = PayloadBuilder.build(START_OF_BLOCK, "Dummy non-hl7 payload", END_OF_BLOCK, END_OF_DATA);
+
+        int actual = MllpSocketUtil.findStartOfBlock(payload);
+
+        assertEquals(0, actual);
+    }
+
+    @Test
+    public void testFindStartOfBlockWithHl7Payload() throws Exception {
+        byte[] payload = PayloadBuilder.build(START_OF_BLOCK, HL7_PAYLOAD_STRING, END_OF_BLOCK, END_OF_DATA);
+
+        int actual = MllpSocketUtil.findStartOfBlock(payload);
+
+        assertEquals(0, actual);
+    }
+
+    @Test
+    public void testFindStartOfBlockWithNullPayload() throws Exception {
+        int actual = MllpSocketUtil.findStartOfBlock(null, 12345);
+
+        assertEquals(-1, actual);
+    }
+
+    @Test
+    public void testFindStartOfBlockWithOnlyStartOfBlock() throws Exception {
+        byte[] payload = PayloadBuilder.build(START_OF_BLOCK);
+
+        int actual = MllpSocketUtil.findStartOfBlock(payload);
+
+        assertEquals(0, actual);
+    }
+
+    @Test
+    public void testFindStartOfBlockWithStartOfBlockAfterLength() throws Exception {
+        byte[] payload = PayloadBuilder.build(HL7_PAYLOAD_STRING, END_OF_BLOCK, END_OF_DATA, START_OF_BLOCK);
+
+        int actual = MllpSocketUtil.findStartOfBlock(payload, payload.length - 1);
+
+        assertEquals(-1, actual);
+    }
+
+    @Test
+    public void testFindStartOfBlockWithMissingStartOfBlock() throws Exception {
+        byte[] payload = PayloadBuilder.build(HL7_PAYLOAD_STRING, END_OF_BLOCK, END_OF_DATA);
+
+        int actual = MllpSocketUtil.findStartOfBlock(payload);
+
+        assertEquals(-1, actual);
+    }
+
+    @Test
+    public void testFindStartOfBlockWithLengthLargerThanArraySize() throws Exception {
+        byte[] payload = PayloadBuilder.build(START_OF_BLOCK, HL7_PAYLOAD_STRING, END_OF_BLOCK, END_OF_DATA);
+
+        int actual = MllpSocketUtil.findStartOfBlock(payload, payload.length + 1);
+
+        assertEquals(0, actual);
+    }
+
+    @Test
+    public void testFindStartOfBlockWithLengthSmallerThanArraySize() throws Exception {
+        byte[] payload = PayloadBuilder.build(START_OF_BLOCK, HL7_PAYLOAD_STRING, END_OF_BLOCK, END_OF_DATA);
+
+        int actual = MllpSocketUtil.findStartOfBlock(payload, payload.length - 2);
+
+        assertEquals(0, actual);
+    }
+
+    @Test
+    public void testFindEndOfMessageWithDummyPayload() throws Exception {
+        final byte[] dummyPayload = "Dummy non-hl7 payload".getBytes();
+        byte[] payload = PayloadBuilder.build(dummyPayload, END_OF_BLOCK, END_OF_DATA);
+
+        int expected = dummyPayload.length;
+        int actual = MllpSocketUtil.findEndOfMessage(payload);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testFindEndOfMessageWithDummyPayloadAndStartOfBlock() throws Exception {
+        final byte[] dummyPayload = "Dummy non-hl7 payload".getBytes();
+        byte[] payload = PayloadBuilder.build(START_OF_BLOCK, dummyPayload, END_OF_BLOCK, END_OF_DATA);
+
+        int expected = dummyPayload.length + 1;
+        int actual = MllpSocketUtil.findEndOfMessage(payload);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testFindEndOfMessageWithHl7Payload() throws Exception {
+        byte[] payload = PayloadBuilder.build(HL7_PAYLOAD_STRING, END_OF_BLOCK, END_OF_DATA);
+
+        int expected = HL7_PAYLOAD_STRING.length();
+        int actual = MllpSocketUtil.findEndOfMessage(payload);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testFindEndOfMessageWithHl7PayloadAndStartOfBlock() throws Exception {
+        byte[] payload = PayloadBuilder.build(START_OF_BLOCK, HL7_PAYLOAD_STRING, END_OF_BLOCK, END_OF_DATA);
+
+        int expected = HL7_PAYLOAD_STRING.length() + 1;
+        int actual = MllpSocketUtil.findEndOfMessage(payload);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testFindEndOfMessageWithNullPayload() throws Exception {
+        assertEquals(-1, MllpSocketUtil.findEndOfMessage(null, 12345));
+    }
+
+    @Test
+    public void testFindEndOfMessagekWithOnlyEndOfBlock() throws Exception {
+        byte[] payload = PayloadBuilder.build(END_OF_BLOCK);
+
+        int actual = MllpSocketUtil.findEndOfMessage(payload);
+
+        assertEquals(-1, actual);
+    }
+
+    @Test
+    public void testFindEndOfMessagekWithOnlyEndOfData() throws Exception {
+        byte[] payload = PayloadBuilder.build(END_OF_DATA);
+
+        int actual = MllpSocketUtil.findEndOfMessage(payload);
+
+        assertEquals(-1, actual);
+    }
+
+    @Test
+    public void testFindEndOfMessagekWithOnlyEndOfBlockAndEndOfData() throws Exception {
+        byte[] payload = PayloadBuilder.build(END_OF_BLOCK, END_OF_DATA);
+
+        int actual = MllpSocketUtil.findEndOfMessage(payload);
+
+        assertEquals(0, actual);
+    }
+
+    @Test
+    public void testFindEndOfMessageWithEndOfBlockAfterLength() throws Exception {
+        byte[] payload = PayloadBuilder.build(HL7_PAYLOAD_STRING, END_OF_BLOCK, END_OF_DATA);
+
+        int actual = MllpSocketUtil.findEndOfMessage(payload, payload.length - 2);
+
+        assertEquals(-1, actual);
+    }
+
+    @Test
+    public void testFindEndOfMessageWithMissingEndOfBlock() throws Exception {
+        byte[] payload = PayloadBuilder.build(HL7_PAYLOAD_STRING, END_OF_DATA);
+
+        int actual = MllpSocketUtil.findEndOfMessage(payload);
+
+        assertEquals(-1, actual);
+    }
+
+    @Test
+    public void testFindEndOfMessageWithEndOfBlockButMissingEndOfData() throws Exception {
+        byte[] payload = PayloadBuilder.build(HL7_PAYLOAD_STRING, END_OF_BLOCK);
+
+        int actual = MllpSocketUtil.findEndOfMessage(payload);
+
+        assertEquals(-1, actual);
+    }
+
+    @Test
+    public void testFindEndOfMessageWithStartOfBlockButMissingEndOfBlock() throws Exception {
+        byte[] payload = PayloadBuilder.build(START_OF_BLOCK, HL7_PAYLOAD_STRING, END_OF_DATA);
+
+        int actual = MllpSocketUtil.findEndOfMessage(payload);
+
+        assertEquals(-1, actual);
+    }
+
+    @Test
+    public void testFindEndOfMessageWithStartOfBlockAndEndOfBlockButMissingEndOfData() throws Exception {
+        byte[] payload = PayloadBuilder.build(START_OF_BLOCK, HL7_PAYLOAD_STRING, END_OF_BLOCK);
+
+        int actual = MllpSocketUtil.findEndOfMessage(payload);
+
+        assertEquals(-1, actual);
+    }
+
+    @Test
+    public void testFindEndOfMessageWithLengthLargerThanArraySize() throws Exception {
+        byte[] payload = PayloadBuilder.build(START_OF_BLOCK, HL7_PAYLOAD_STRING, END_OF_BLOCK, END_OF_DATA);
+
+        int expected = HL7_PAYLOAD_STRING.length() + 1;
+        int actual = MllpSocketUtil.findEndOfMessage(payload, payload.length + 1);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testFindEndOfMessageWithLengthSmallerThanArraySize() throws Exception {
+        byte[] payload = PayloadBuilder.build(START_OF_BLOCK, HL7_PAYLOAD_STRING, END_OF_BLOCK, END_OF_DATA);
+
+        int actual = MllpSocketUtil.findEndOfMessage(payload, payload.length - 1);
+
+        assertEquals(-1, actual);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketUtilSocketTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketUtilSocketTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketUtilSocketTest.java
new file mode 100644
index 0000000..9f7007c
--- /dev/null
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketUtilSocketTest.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp.impl;
+
+import java.net.ServerSocket;
+import java.net.Socket;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class MllpSocketUtilSocketTest {
+    Logger logger = LoggerFactory.getLogger(this.getClass());
+    ServerSocket serverSocket;
+    Socket socket;
+
+    @Before
+    public void setUp() throws Exception {
+        serverSocket = new ServerSocket(0);
+        socket = new Socket();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (socket != null) {
+            socket.close();
+        }
+        serverSocket.close();
+    }
+
+    @Test
+    public void testSetSoTimeout() throws Exception {
+        final int expected = 1000;
+
+        connect();
+
+        MllpSocketUtil.setSoTimeout(socket, expected, null, null);
+
+        assertEquals(expected, socket.getSoTimeout());
+    }
+
+    @Test
+    public void testSetSoTimeoutWithLogger() throws Exception {
+        final int expected = 1000;
+
+        connect();
+
+        MllpSocketUtil.setSoTimeout(socket, expected, logger, null);
+
+        assertEquals(expected, socket.getSoTimeout());
+    }
+
+    @Test
+    public void testSetSoTimeoutWithLoggerAndReason() throws Exception {
+        final int expected = 1000;
+
+        connect();
+
+        MllpSocketUtil.setSoTimeout(socket, expected, logger, "Testing setSoTimeout");
+
+        assertEquals(expected, socket.getSoTimeout());
+    }
+
+    @Test
+    public void testSetSoTimeoutWithUnconnectedSocket() throws Exception {
+        int expected = 1000;
+
+        MllpSocketUtil.setSoTimeout(socket, expected, logger, "Testing setSoTimeout with unconnected Socket");
+
+        assertEquals(expected, socket.getSoTimeout());
+    }
+
+    @Test
+    public void testSetSoTimeoutWithClosedSocket() throws Exception {
+        int expected = 1000;
+
+        connect();
+        close();
+
+        MllpSocketUtil.setSoTimeout(socket, expected, logger, "Testing setSoTimeout with closed Socket");
+
+        // We can't get the SO_TIMEOUT from a closed socket (Socket.getSoTimeout() will throw a SocketException
+        // assertEquals(expected, socket.getSoTimeout());
+    }
+
+    @Test
+    public void testSetSoTimeoutWithResetSocket() throws Exception {
+        int expected = 1000;
+
+        connect();
+        close();
+
+        MllpSocketUtil.close(socket, null, null);
+
+        MllpSocketUtil.setSoTimeout(socket, expected, logger, "Testing setSoTimeout with reset Socket");
+
+        // We can't get the SO_TIMEOUT from a closed socket (Socket.getSoTimeout() will throw a SocketException
+        // assertEquals(expected, socket.getSoTimeout());
+    }
+
+    @Test
+    public void testClose() throws Exception {
+        connect();
+
+        MllpSocketUtil.close(socket, null, null);
+
+        assertTrue("Socket should be closed", socket.isClosed());
+    }
+
+    @Test
+    public void testCloseWithLogger() throws Exception {
+        connect();
+
+        MllpSocketUtil.close(socket, logger, null);
+
+        assertTrue("Socket should be closed", socket.isClosed());
+    }
+
+    @Test
+    public void testCloseWithLoggerAndReason() throws Exception {
+        connect();
+
+        MllpSocketUtil.close(socket, logger, "Testing close");
+
+        assertTrue("Socket should be closed", socket.isClosed());
+    }
+
+    @Test
+    public void testCloseWithUnconnectedSocket() throws Exception {
+        MllpSocketUtil.close(socket, logger, "Testing close with unconnected Socket");
+
+        assertFalse("Socket should NOT closed because it was never connected", socket.isClosed());
+    }
+
+    @Test
+    public void testCloseWithClosedSocket() throws Exception {
+        connect();
+        close();
+
+        MllpSocketUtil.close(socket, logger, "Testing close with closed Socket");
+
+        assertTrue("Socket should be closed", socket.isClosed());
+    }
+
+    @Test
+    public void testCloseWithResetSocket() throws Exception {
+        connect();
+        reset();
+
+        MllpSocketUtil.close(socket, logger, "Testing close with reset Socket");
+
+        assertTrue("Socket should be closed", socket.isClosed());
+    }
+
+    @Test
+    public void testReset() throws Exception {
+        connect();
+
+        MllpSocketUtil.reset(socket, null, null);
+
+        assertTrue("Socket should be closed", socket.isClosed());
+    }
+
+    @Test
+    public void testResetWithLogger() throws Exception {
+        connect();
+
+        MllpSocketUtil.reset(socket, logger, null);
+
+        assertTrue("Socket should be closed", socket.isClosed());
+    }
+
+    @Test
+    public void testResetWithLoggerAndReason() throws Exception {
+        connect();
+
+        MllpSocketUtil.reset(socket, logger, "Testing reset");
+
+        assertTrue("Socket should be closed", socket.isClosed());
+    }
+
+    @Test
+    public void testResetWithUnconnectedSocket() throws Exception {
+        MllpSocketUtil.reset(socket, logger, "Testing reset with unconnected Socket");
+
+        assertFalse("Socket should NOT closed because it was never connected", socket.isClosed());
+    }
+
+    @Test
+    public void testResetWithClosedSocket() throws Exception {
+        connect();
+        close();
+
+        MllpSocketUtil.reset(socket, logger, "Testing reset with closed Socket");
+
+        assertTrue("Socket should be closed", socket.isClosed());
+    }
+
+    @Test
+    public void testResetWithResetSocket() throws Exception {
+        connect();
+        reset();
+
+        MllpSocketUtil.reset(socket, logger, "Testing reset with reset Socket");
+
+        assertTrue("Socket should be closed", socket.isClosed());
+    }
+
+    @Test
+    public void testGetAddressString() throws Exception {
+        connect();
+
+        String address = MllpSocketUtil.getAddressString(socket);
+
+        assertNotNull("Should have an address string", address);
+    }
+
+    @Test
+    public void testGetAddressStringWithUnconnectedSocket() throws Exception {
+        String address = MllpSocketUtil.getAddressString(socket);
+
+        assertNotNull("Should have an address string", address);
+    }
+
+    @Test
+    public void testGetAddressStringWithClosedSocket() throws Exception {
+        connect();
+        close();
+
+        String address = MllpSocketUtil.getAddressString(socket);
+
+        assertNotNull("Should have an address string", address);
+    }
+
+    // Utility Methods
+
+    private void connect() throws Exception {
+        if (socket != null) {
+            socket = new Socket(serverSocket.getInetAddress().getHostAddress(), serverSocket.getLocalPort());
+        } else {
+            socket.connect(serverSocket.getLocalSocketAddress());
+        }
+
+        assertTrue("Socket should be open", socket.isConnected() && !socket.isClosed());
+    }
+
+    private void close() throws Exception {
+        if (socket != null) {
+            if (socket.isConnected() && !socket.isClosed()) {
+                socket.close();
+            }
+
+            assertTrue("Socket should have been connected and closed", socket.isConnected() && socket.isClosed());
+        }
+    }
+
+    private void reset() throws Exception {
+        if (socket != null) {
+            if (socket.isConnected() && !socket.isClosed()) {
+                socket.setSoLinger(true, 0);
+                socket.close();
+            }
+
+            assertTrue("Socket should have been connected and closed", socket.isConnected() && socket.isClosed());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketWriterTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketWriterTestSupport.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketWriterTestSupport.java
new file mode 100644
index 0000000..a819771
--- /dev/null
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/impl/MllpSocketWriterTestSupport.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+
+public abstract class MllpSocketWriterTestSupport {
+    static final String TEST_MESSAGE =
+        "MSH|^~\\&|ADT|EPIC|JCAPS|CC|20161206193919|RISTECH|ADT^A08|00001|D|2.3^^|||||||" + '\r'
+            + "EVN|A08|20150107161440||REG_UPDATE_SEND_VISIT_MESSAGES_ON_PATIENT_CHANGES|RISTECH^RADIOLOGY^TECHNOLOGIST^^^^^^UCLA^^^^^RRMC||" + '\r'
+            + "PID|1|2100355^^^MRN^MRN|2100355^^^MRN^MRN||MDCLS9^MC9||19700109|F||U|111 HOVER STREET^^LOS ANGELES^CA^90032^USA^P^^LOS ANGELE|LOS ANGELE|"
+                + "(310)725-6952^P^PH^^^310^7256952||ENGLISH|U||60000013647|565-33-2222|||U||||||||N||" + '\r'
+            + "PD1|||UCLA HEALTH SYSTEM^^10|10002116^ADAMS^JOHN^D^^^^^EPIC^^^^PROVID||||||||||||||" + '\r'
+            + "NK1|1|DOE^MC9^^|OTH|^^^^^USA|(310)888-9999^^^^^310^8889999|(310)999-2222^^^^^310^9992222|Emergency Contact 1|||||||||||||||||||||||||||" + '\r'
+            + "PV1|1|OUTPATIENT|RR CT^^^1000^^^^^^^DEPID|EL|||017511^TOBIAS^JONATHAN^^^^^^EPIC^^^^PROVID|017511^TOBIAS^JONATHAN^^^^^^EPIC^^^^PROVID||||||"
+                + "CLR|||||60000013647|SELF|||||||||||||||||||||HOV_CONF|^^^1000^^^^^^^||20150107161438||||||||||" + '\r'
+            + "PV2||||||||20150107161438||||CT BRAIN W WO CONTRAST||||||||||N|||||||||||||||||||||||||||" + '\r'
+            + "ZPV||||||||||||20150107161438|||||||||" + '\r'
+            + "AL1|1||33361^NO KNOWN ALLERGIES^^NOTCOMPUTRITION^NO KNOWN ALLERGIES^EXTELG||||||" + '\r'
+            + "DG1|1|DX|784.0^Headache^DX|Headache||VISIT" + '\r'
+            + "GT1|1|1000235129|MDCLS9^MC9^^||111 HOVER STREET^^LOS ANGELES^CA^90032^USA^^^LOS ANGELE|(310)725-6952^^^^^310^7256952||19700109|F|P/F|SLF|"
+                + "565-33-2222|||||^^^^^USA|||UNKNOWN|||||||||||||||||||||||||||||" + '\r'
+            + "UB2||||||||" + '\r'
+            + '\n';
+
+    static final String TEST_ACKNOWLEDGEMENT =
+        "MSH|^~\\&|JCAPS|CC|ADT|EPIC|20161206193919|RISTECH|ACK^A08|00001|D|2.3^^|||||||" + '\r'
+            + "MSA|AA|00001|" + '\r'
+            + '\n';
+
+    FakeSocket fakeSocket = new FakeSocket();
+
+    class FakeSocket extends Socket {
+        boolean connected = true;
+        boolean closed;
+
+        FakeSocketOutputStream fakeSocketOutputStream = new FakeSocketOutputStream();
+
+        FakeSocket() {
+        }
+
+        @Override
+        public boolean isConnected() {
+            return connected;
+        }
+
+        @Override
+        public boolean isClosed() {
+            return closed;
+        }
+
+        @Override
+        public OutputStream getOutputStream() throws IOException {
+            if (fakeSocketOutputStream == null) {
+                return super.getOutputStream();
+            }
+            return fakeSocketOutputStream;
+        }
+
+        byte[] payload() {
+            if (fakeSocketOutputStream != null) {
+                return fakeSocketOutputStream.fakeOutputStream.toByteArray();
+            }
+
+            return null;
+        }
+    }
+
+    class FakeSocketOutputStream extends OutputStream {
+        ByteArrayOutputStream fakeOutputStream = new ByteArrayOutputStream();
+
+        boolean failOnWrite;
+        boolean failOnWriteArray;
+
+        Byte writeFailOn;
+        byte[] writeArrayFailOn;
+
+        FakeSocketOutputStream() {
+        }
+
+        @Override
+        public void write(int b) throws IOException {
+            if (failOnWrite) {
+                throw new IOException("Faking write failure");
+            } else if (writeFailOn != null && writeFailOn == b) {
+                throw new IOException("Faking write failure");
+            }
+
+            fakeOutputStream.write(b);
+        }
+
+        @Override
+        public void write(byte[] array, int off, int len) throws IOException {
+            if (failOnWriteArray) {
+                throw new IOException("Faking write array failure");
+            }
+
+            if (writeArrayFailOn != null) {
+                if (writeArrayFailOn == array) {
+                    throw new IOException("Faking write array failure");
+                }
+                for (int i = 0; i < Math.min(len, writeArrayFailOn.length); ++i) {
+                    if (array[off + i] != writeArrayFailOn[i]) {
+                        super.write(array, off, len);
+                        return;
+                    }
+                }
+                throw new IOException("Faking write array failure");
+            } else {
+                super.write(array, off, len);
+            }
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpClientResource.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpClientResource.java b/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpClientResource.java
index 8157e69..56634eb 100644
--- a/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpClientResource.java
+++ b/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpClientResource.java
@@ -60,6 +60,7 @@ public class MllpClientResource extends ExternalResource {
     boolean reuseAddress;
     boolean tcpNoDelay = true;
 
+    DisconnectMethod disconnectMethod = DisconnectMethod.CLOSE;
 
     /**
      * Use this constructor to avoid having the connection started by JUnit (since the port is still -1)
@@ -89,7 +90,22 @@ public class MllpClientResource extends ExternalResource {
     @Override
     protected void after() {
         super.after();
-        this.disconnect();
+        this.close();
+    }
+
+    public void close() {
+        try {
+            if (null != inputStream) {
+                clientSocket.close();
+            }
+        } catch (IOException e) {
+            log.warn(String.format("Exception encountered closing connection to {}:{}", mllpHost, mllpPort), e);
+        } finally {
+            inputStream = null;
+            outputStream = null;
+            clientSocket = null;
+        }
+        return;
     }
 
     public void connect() {
@@ -116,33 +132,36 @@ public class MllpClientResource extends ExternalResource {
         }
     }
 
-    public void close() {
-        this.disconnect();
-        return;
-    }
-
     public void reset() {
         try {
             clientSocket.setSoLinger(true, 0);
         } catch (SocketException socketEx) {
             log.warn("Exception encountered setting set SO_LINGER to force a TCP reset", socketEx);
         }
-        this.disconnect();
-        return;
-    }
-
-    public void disconnect() {
         try {
             if (null != inputStream) {
                 clientSocket.close();
             }
         } catch (IOException e) {
-            log.warn(String.format("Exception encountered closing connection to {}:{}", mllpHost, mllpPort), e);
+            log.warn(String.format("Exception encountered resetting connection to {}:{}", mllpHost, mllpPort), e);
         } finally {
             inputStream = null;
             outputStream = null;
             clientSocket = null;
         }
+        return;
+    }
+
+    public void disconnect() {
+        if (DisconnectMethod.RESET == disconnectMethod) {
+            reset();
+        } else {
+            close();
+        }
+    }
+
+    public DisconnectMethod getDisconnectMethod() {
+        return disconnectMethod;
     }
 
     public boolean isConnected() {
@@ -458,4 +477,12 @@ public class MllpClientResource extends ExternalResource {
         this.tcpNoDelay = tcpNoDelay;
     }
 
+    public void setDisconnectMethod(DisconnectMethod disconnectMethod) {
+        this.disconnectMethod = disconnectMethod;
+    }
+
+    public enum DisconnectMethod {
+        CLOSE,
+        RESET
+    }
 }
\ No newline at end of file


Mime
View raw message