camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [4/6] camel git commit: CAMEL-10511: Updated MllpTcpClientProducer and MllpTcpServerConsumer to consume all available data on socket
Date Thu, 15 Dec 2016 19:20:33 GMT
http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java
deleted file mode 100644
index 20315e7..0000000
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java
+++ /dev/null
@@ -1,390 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.mllp.impl;
-
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-
-import org.apache.camel.component.mllp.MllpComponent;
-import org.apache.camel.component.mllp.MllpException;
-import org.apache.camel.component.mllp.MllpFrameException;
-import org.apache.camel.component.mllp.MllpTimeoutException;
-import org.apache.camel.component.mllp.MllpWriteException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_BLOCK;
-import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_DATA;
-import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_STREAM;
-import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK;
-
-/**
- * Supplies methods to read and write messages in a MLLP Frame.
- * <p/>
- * Although the methods in the class are intended to handle HL7 v2 formatted messages, the methods do not
- * depend on that format - any byte[]can be written to the Socket.  Also, any byte[] can be read from the socket
- * provided it has the proper MLLP Enveloping - <START_OF_BLOCK>payload<END_OF_BLOCK><END_OF_DATA>>.
- * <p/>
- * NOTE: MLLP payloads are not logged unless the logging level is set to DEBUG or TRACE to avoid introducing PHI
- * into the log files.  Logging of PHI can be globally disabled by setting the org.apache.camel.mllp.logPHI system
- * property.  The property is evaluated using Boolean.parseBoolean.
- * <p/>
- */
-public final class MllpUtil {
-    private static final Logger LOG = LoggerFactory.getLogger(MllpUtil.class);
-
-    private MllpUtil() {
-    }
-
-    /**
-     * Open the MLLP frame by reading from the Socket until the begging of the frame is found.
-     * <p/>
-     * If any errors occur (including MLLP frame errors) while opening the frame, the socket will be closed and an
-     * Exception will be thrown.
-     *
-     * @param socket the Socket to read
-     * @throws SocketTimeoutException    thrown if a timeout occurs while looking for the beginning of the MLLP frame, but
-     *                                   nothing is yet available - this is NOT an error condition
-     * @throws MllpFrameException if the MLLP Frame is corrupted in some way
-     * @throws MllpException             for other unexpected error conditions
-     */
-    public static boolean openFrame(Socket socket, int receiveTimeout, int readTimeout) throws SocketTimeoutException, MllpFrameException, MllpException {
-        if (socket.isConnected() && !socket.isClosed()) {
-            InputStream socketInputStream = MllpUtil.getInputStream(socket);
-
-            int readByte = -1;
-            try {
-                socket.setSoTimeout(receiveTimeout);
-                readByte = socketInputStream.read();
-                switch (readByte) {
-                case START_OF_BLOCK:
-                    return true;
-                case END_OF_STREAM:
-                    resetConnection(socket);
-                    return false;
-                default:
-                    // Continue on and process the out-of-frame data
-                }
-            } catch (SocketTimeoutException normaTimeoutEx) {
-                // Just pass this on - the caller will wrap it in a MllpTimeoutException
-                throw normaTimeoutEx;
-            } catch (SocketException socketEx) {
-                if (socket.isClosed()) {
-                    LOG.debug("Socket closed while opening MLLP frame - ignoring exception", socketEx);
-                    return false;
-                } else {
-                    LOG.error("Unexpected Exception occurred opening MLLP frame - resetting the connection");
-                    MllpUtil.resetConnection(socket);
-                    throw new MllpException("Unexpected Exception occurred opening MLLP frame", socketEx);
-                }
-            } catch (IOException unexpectedException) {
-                LOG.error("Unexpected Exception occurred opening MLLP frame - resetting the connection");
-                MllpUtil.resetConnection(socket);
-                throw new MllpException("Unexpected Exception occurred opening MLLP frame", unexpectedException);
-            }
-
-            /*
-             From here on, we're in a bad frame state.  Read what's left in the socket, close the connection and
-             return the out-of-frame data.
-              */
-            ByteArrayOutputStream outOfFrameData = new ByteArrayOutputStream();
-            outOfFrameData.write(readByte);
-
-            try {
-                socket.setSoTimeout(readTimeout);
-                while (true) {
-                    readByte = socketInputStream.read();
-                    switch (readByte) {
-                    case END_OF_STREAM:
-                        if (isLogPHIEnabled(LOG)) {
-                            LOG.error("END_OF_STREAM read while looking for the beginning of the MLLP frame, and "
-                                            + "out-of-frame data had been read - resetting connection and eating out-of-frame data: {}",
-                                    outOfFrameData.toString().replace('\r', '\n'));
-                        } else {
-                            LOG.error("END_OF_STREAM read while looking for the beginning of the MLLP frame, and out-of-frame data had been read - resetting connection and eating out-of-frame data");
-                        }
-                        resetConnection(socket);
-
-                        throw new MllpFrameException("END_OF_STREAM read while looking for the beginning of the MLLP frame", outOfFrameData.toByteArray());
-                    case START_OF_BLOCK:
-                        if (isLogPHIEnabled(LOG)) {
-                            LOG.warn("The beginning of the MLLP frame was preceded by out-of-frame data - eating data: {}", outOfFrameData.toString().replace('\r', '\n'));
-                        } else {
-                            LOG.warn("The beginning of the MLLP frame was preceded by out-of-frame data - eating data");
-                        }
-
-                        throw new MllpFrameException("The beginning of the MLLP frame was preceded by out-of-frame data", outOfFrameData.toByteArray());
-                    default:
-                        // still reading out-of-frame data
-                        outOfFrameData.write(readByte);
-                        break;
-                    }
-                }
-            } catch (SocketTimeoutException timeoutEx) {
-                if (isLogPHIEnabled(LOG)) {
-                    LOG.error("Timeout looking for the beginning of the MLLP frame, and out-of-frame data had been read - resetting connection and eating out-of-frame data: {}",
-                            outOfFrameData.toString().replace('\r', '\n'));
-                } else {
-                    LOG.error("Timeout looking for the beginning of the MLLP frame, and out-of-frame data had been read - resetting connection and eating out-of-frame data");
-                }
-
-                resetConnection(socket);
-
-                throw new MllpFrameException("Timeout looking for the beginning of the MLLP frame, and out-of-frame data had been read", outOfFrameData.toByteArray());
-            } catch (IOException e) {
-                if (isLogPHIEnabled(LOG)) {
-                    LOG.error("Exception encountered looking for the beginning of the MLLP frame, and out-of-frame data had been read - resetting connection and eating out-of-frame data: {}",
-                            outOfFrameData.toString().replace('\r', '\n'));
-                } else {
-                    LOG.error("Exception encountered looking for the beginning of the MLLP frame, and out-of-frame data had been read - resetting connection and eating out-of-frame data");
-                }
-
-                resetConnection(socket);
-
-                throw new MllpFrameException("Exception encountered looking for the beginning of the MLLP frame, and out-of-frame data had been read", outOfFrameData.toByteArray());
-            }
-        }
-
-        return false;
-    }
-
-    /**
-     * Close a MLLP frame by reading from the socket until the end of the frame is found.
-     * <p/>
-     * The method assumes the MLLP frame has already been opened and the first byte available
-     * will be the first byte of the framed message.
-     * <p/>
-     * The method consumes the END_OF_BLOCK and END_OF_DATA bytes from the stream before returning the payload
-     * <p/>
-     * If any errors occur (including MLLP frame errors) while opening the frame, the socket will be closed and an
-     * Exception will be thrown.
-     *
-     * @param socket the Socket to be read
-     * @return the payload of the MLLP-Enveloped message as a byte[]
-     * @throws MllpTimeoutException      thrown if a timeout occurs while closing the MLLP frame
-     * @throws MllpFrameException if the MLLP Frame is corrupted in some way
-     * @throws MllpException             for other unexpected error conditions
-     */
-    public static byte[] closeFrame(Socket socket, int receiveTimeout, int readTimeout) throws MllpTimeoutException, MllpFrameException, MllpException {
-        if (socket.isConnected() && !socket.isClosed()) {
-            InputStream socketInputStream = MllpUtil.getInputStream(socket);
-            // TODO:  Come up with an intelligent way to size this stream
-            ByteArrayOutputStream payload = new ByteArrayOutputStream(4096);
-            try {
-                socket.setSoTimeout(readTimeout);
-                while (true) {
-                    int readByte = socketInputStream.read();
-                    switch (readByte) {
-                    case END_OF_STREAM:
-                        if (isLogPHIEnabled(LOG)) {
-                            LOG.error("END_OF_STREAM read while looking for the end of the MLLP frame - resetting connection and eating data: {}", payload.toString().replace('\r', '\n'));
-                        } else {
-                            LOG.error("END_OF_STREAM read while looking for the end of the MLLP frame - resetting connection and eating data");
-                        }
-
-                        resetConnection(socket);
-
-                        throw new MllpFrameException("END_OF_STREAM read while looking for the end of the MLLP frame", payload.size() > 0 ? payload.toByteArray() : null);
-                    case START_OF_BLOCK:
-                        if (isLogPHIEnabled(LOG)) {
-                            LOG.error("A new MLLP frame was opened before the previous frame was closed - resetting connection and eating data: {}", payload.toString().replace('\r', '\n'));
-                        } else {
-                            LOG.error("A new MLLP frame was opened before the previous frame was closed - resetting connection and eating data");
-                        }
-
-                        resetConnection(socket);
-
-                        throw new MllpFrameException("A new MLLP frame was opened before the previous frame was closed", payload.size() > 0 ? payload.toByteArray() : null);
-                    case END_OF_BLOCK:
-                        if (END_OF_DATA != socketInputStream.read()) {
-                            if (isLogPHIEnabled(LOG)) {
-                                LOG.error("The MLLP frame was partially closed - END_OF_BLOCK was not followed by END_OF_DATA - resetting connection and eating data: {}",
-                                        payload.toString().replace('\r', '\n'));
-                            } else {
-                                LOG.error("The MLLP frame was partially closed - END_OF_BLOCK was not followed by END_OF_DATA - resetting connection and eating data");
-                            }
-
-                            resetConnection(socket);
-
-                            throw new MllpFrameException("The MLLP frame was partially closed - END_OF_BLOCK was not followed by END_OF_DATA",
-                                    payload.size() > 0 ? payload.toByteArray() : null);
-                        }
-                        socket.setSoTimeout(receiveTimeout);
-                        return payload.toByteArray();
-                    default:
-                        // log.trace( "Read Character: {}", (char)readByte );
-                        payload.write(readByte);
-                    }
-                }
-            } catch (SocketTimeoutException timeoutEx) {
-                if (0 < payload.size()) {
-                    if (isLogPHIEnabled(LOG)) {
-                        LOG.error("Timeout looking for the end of the MLLP frame - resetting connection and eating data: {}", payload.toString().replace('\r', '\n'));
-                    } else {
-                        LOG.error("Timeout looking for the end of the MLLP frame - resetting connection and eating data");
-                    }
-                } else {
-                    LOG.error("Timeout looking for the end of the MLLP frame - resetting connection");
-                }
-
-                resetConnection(socket);
-
-                throw new MllpFrameException("Timeout looking for the end of the MLLP frame", payload.size() > 0 ? payload.toByteArray() : null, timeoutEx);
-            } catch (IOException ioEx) {
-                if (0 < payload.size()) {
-                    if (isLogPHIEnabled(LOG)) {
-                        LOG.error("Exception encountered looking for the end of the MLLP frame - resetting connection and eating data: {}", payload.toString().replace('\r', '\n'));
-                    } else {
-                        LOG.error("Exception encountered looking for the end of the MLLP frame - resetting connection and eating data");
-                    }
-                } else {
-                    LOG.error("Exception encountered looking for the end of the MLLP frame - resetting connection");
-                }
-
-                resetConnection(socket);
-
-                throw new MllpFrameException("Exception encountered looking for the end of the MLLP frame", payload.size() > 0 ? payload.toByteArray() : null, ioEx);
-            }
-        }
-
-        try {
-            socket.setSoTimeout(receiveTimeout);
-        } catch (SocketException e) {
-            // Eat this exception
-        }
-        return null;
-    }
-
-    /**
-     * Write a MLLP-Framed payload to the Socket
-     *
-     * @param socket  the Socket to write the payload
-     * @param payload the MLLP payload
-     * @return true if write was successful; false otherwise
-     * @throws MllpWriteException if the write fails
-     * @throws MllpException      for other unexpected error conditions
-     */
-    public static void writeFramedPayload(Socket socket, byte[] payload) throws MllpException {
-        if (socket.isConnected() && !socket.isClosed()) {
-            OutputStream outputStream;
-            try {
-                outputStream = new BufferedOutputStream(socket.getOutputStream(), payload.length + 4);
-            } catch (IOException ioEx) {
-                LOG.error("Error Retrieving OutputStream from Socket - resetting connection");
-
-                resetConnection(socket);
-
-                throw new MllpException("Error Retrieving OutputStream from Socket", ioEx);
-            }
-
-            if (null != outputStream) {
-                try {
-                    outputStream.write(START_OF_BLOCK);
-                    outputStream.write(payload, 0, payload.length);
-                    outputStream.write(END_OF_BLOCK);
-                    outputStream.write(END_OF_DATA);
-                    outputStream.flush();
-                } catch (IOException ioEx) {
-                    LOG.error("Error writing MLLP payload - resetting connection");
-
-                    resetConnection(socket);
-
-                    throw new MllpWriteException("Error writing MLLP payload", payload, ioEx);
-                }
-            }
-        }
-    }
-
-    public static void closeConnection(Socket socket) {
-        if (null != socket) {
-            if (!socket.isClosed()) {
-                try {
-                    socket.shutdownInput();
-                } catch (Exception ex) {
-                    LOG.warn("Exception encountered shutting down the input stream on the client socket", ex);
-                }
-
-                try {
-                    socket.shutdownOutput();
-                } catch (Exception ex) {
-                    LOG.warn("Exception encountered shutting down the output stream on the client socket", ex);
-                }
-
-                try {
-                    socket.close();
-                } catch (Exception ex) {
-                    LOG.warn("Exception encountered closing the client socket", ex);
-                }
-            }
-        }
-    }
-
-    public static void resetConnection(Socket socket) {
-        if (null != socket  &&  !socket.isClosed()) {
-            try {
-                socket.setSoLinger(true, 0);
-            } catch (Exception ex) {
-                LOG.warn("Exception encountered setting SO_LINGER to 0 on the socket to force a reset", ex);
-            }
-
-            try {
-                socket.close();
-            } catch (Exception ex) {
-                LOG.warn("Exception encountered closing the client socket", ex);
-            }
-
-        }
-
-    }
-
-    /**
-     * Retrieve the InputStream from the Socket
-     * <p/>
-     * Private utility method that catches IOExceptions when retrieving the InputStream
-     *
-     * @param socket Socket to get the InputStream from
-     * @return the InputStream for the Socket
-     * @throws MllpException when unexpected conditions occur
-     */
-    private static InputStream getInputStream(Socket socket) throws MllpException {
-        InputStream socketInputStream = null;
-        try {
-            socketInputStream = socket.getInputStream();
-        } catch (IOException ioEx) {
-            throw new MllpException("Error Retrieving InputStream from Socket", ioEx);
-        }
-
-        return socketInputStream;
-    }
-
-    private static boolean isLogPHIEnabled(Logger targetLogger) {
-        if (targetLogger.isDebugEnabled()) {
-            if (Boolean.parseBoolean(System.getProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "true"))) {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerator.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerator.java b/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerator.java
index c7f4473..5c0013c 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerator.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerator.java
@@ -143,6 +143,7 @@ public class Hl7AcknowledgementGenerator implements Processor {
         acknowledgement.write(SEGMENT_DELIMITER);
 
         // Terminate the message
+        acknowledgement.write(SEGMENT_DELIMITER);
         acknowledgement.write(MESSAGE_TERMINATOR);
 
         return acknowledgement.toByteArray();

http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpAcknowledgementExceptionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpAcknowledgementExceptionTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpAcknowledgementExceptionTest.java
deleted file mode 100644
index 439a3c9..0000000
--- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpAcknowledgementExceptionTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.mllp;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertEquals;
-
-public class MllpAcknowledgementExceptionTest {
-    static final String HL7_MESSAGE = "MSH|^~\\&|APP_A|FAC_A|^org^sys||||ADT^A04^ADT_A04|||2.6" + '\r'
-            + "PID|1||1100832^^^^PI||TEST^FIG||98765432|U||R|435 MAIN STREET^^LONGMONT^CO^80503||123-456-7890|||S" + '\r'
-            + '\r' + '\n';
-
-    static final String HL7_ACKNOWLEDGEMENT = "MSH|^~\\&|^org^sys||APP_A|FAC_A|||ACK^A04^ADT_A04|||2.6" + '\r' + "MSA|AA|" + '\r' + '\n';
-
-    static final String EXCEPTION_MESSAGE_WITH_LOG_PHI_DISABLED = MllpAcknowledgementDeliveryException.EXCEPTION_MESSAGE;
-    static final String EXCEPTION_MESSAGE_WITH_LOG_PHI_ENABLED =
-            String.format("%s:\n\tHL7 Message: %s\n\tHL7 Acknowledgement: %s",
-                    MllpAcknowledgementDeliveryException.EXCEPTION_MESSAGE,
-                    new String(HL7_MESSAGE).replaceAll("\r", "<CR>").replaceAll("\n", "<LF>"),
-                    new String(HL7_ACKNOWLEDGEMENT).replaceAll("\r", "<CR>").replaceAll("\n", "<LF>")
-            );
-
-    Exception exception;
-
-    Logger log = LoggerFactory.getLogger(this.getClass());
-
-    @Before
-    public void setUp() throws Exception {
-        exception = new MllpAcknowledgementDeliveryException(HL7_MESSAGE.getBytes(), HL7_ACKNOWLEDGEMENT.getBytes());
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        System.clearProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY);
-    }
-
-
-    @Test
-    public void testLogPhiDefault() throws Exception {
-        String exceptionMessage = exception.getMessage();
-
-        assertEquals(EXCEPTION_MESSAGE_WITH_LOG_PHI_ENABLED, exceptionMessage);
-    }
-
-    @Test
-    public void testLogPhiDisabled() throws Exception {
-        System.setProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "false");
-
-        String exceptionMessage = exception.getMessage();
-
-        assertEquals(EXCEPTION_MESSAGE_WITH_LOG_PHI_DISABLED, exceptionMessage);
-    }
-
-    @Test
-    public void testLogPhiEnabled() throws Exception {
-        System.setProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "true");
-
-        String exceptionMessage = exception.getMessage();
-
-        assertEquals(EXCEPTION_MESSAGE_WITH_LOG_PHI_ENABLED, exceptionMessage);
-    }
-
-    @Test
-    public void testNullMessage() throws Exception {
-        final String expectedMessage =
-                String.format("%s:\n\tHL7 Message: null\n\tHL7 Acknowledgement: %s",
-                        MllpAcknowledgementDeliveryException.EXCEPTION_MESSAGE,
-                        new String(HL7_ACKNOWLEDGEMENT).replaceAll("\r", "<CR>").replaceAll("\n", "<LF>")
-                );
-
-        exception = new MllpAcknowledgementDeliveryException(null, HL7_ACKNOWLEDGEMENT.getBytes());
-
-        System.setProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "true");
-        String exceptionMessage = exception.getMessage();
-
-        assertEquals(expectedMessage, exceptionMessage);
-    }
-
-    @Test
-    public void testNullAcknowledgement() throws Exception {
-        final String expectedMessage =
-                String.format("%s:\n\tHL7 Message: %s\n\tHL7 Acknowledgement: null",
-                        MllpAcknowledgementDeliveryException.EXCEPTION_MESSAGE,
-                        new String(HL7_MESSAGE).replaceAll("\r", "<CR>").replaceAll("\n", "<LF>")
-                );
-
-        exception = new MllpAcknowledgementDeliveryException(HL7_MESSAGE.getBytes(), null);
-
-        System.setProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "true");
-        String exceptionMessage = exception.getMessage();
-
-        assertEquals(expectedMessage, exceptionMessage);
-    }
-
-    @Test
-    public void testToString() throws Exception {
-        final String expectedString =
-                "org.apache.camel.component.mllp.MllpAcknowledgementDeliveryException: "
-                        + "{hl7Message="
-                        +      "MSH|^~\\&|APP_A|FAC_A|^org^sys||||ADT^A04^ADT_A04|||2.6<CR>"
-                        +      "PID|1||1100832^^^^PI||TEST^FIG||98765432|U||R|435 MAIN STREET^^LONGMONT^CO^80503||123-456-7890|||S<CR><CR><LF>"
-                        + ", hl7Acknowledgement="
-                        +      "MSH|^~\\&|^org^sys||APP_A|FAC_A|||ACK^A04^ADT_A04|||2.6<CR>MSA|AA|<CR><LF>"
-                        + "}";
-
-        assertEquals(expectedString, exception.toString());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpExceptionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpExceptionTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpExceptionTest.java
new file mode 100644
index 0000000..0c6940b
--- /dev/null
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpExceptionTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+import org.junit.After;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MllpExceptionTest {
+    static final String EXCEPTION_MESSAGE = "Test Frame Exception";
+
+    static final String HL7_MESSAGE =
+            "MSH|^~\\&|APP_A|FAC_A|^org^sys||20161206193919||ADT^A04|00001||2.6" + '\r'
+                    + "PID|1||1100832^^^^PI||TEST^FIG||98765432|U||R|435 MAIN STREET^^LONGMONT^CO^80503||123-456-7890|||S" + '\r'
+                    + '\r' + '\n';
+
+    static final String HL7_ACK =
+            "MSH|^~\\&|APP_A|FAC_A|^org^sys||20161206193919||ACK^A04|00002||2.6" + '\r'
+                    + "MSA|AA|00001" + '\r'
+                    + '\r' + '\n';
+
+    @After
+    public void tearDown() throws Exception {
+        System.clearProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY);
+    }
+
+    @Test
+    public void testLogPhiDefault() throws Exception {
+        assertEquals(expectedMessage(HL7_MESSAGE, HL7_ACK), createException(HL7_MESSAGE, HL7_ACK).getMessage());
+    }
+
+    @Test
+    public void testLogPhiDisabled() throws Exception {
+        System.setProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "false");
+
+        assertEquals(EXCEPTION_MESSAGE, createException(HL7_MESSAGE, HL7_ACK).getMessage());
+    }
+
+    @Test
+    public void testLogPhiEnabled() throws Exception {
+        System.setProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "true");
+
+        assertEquals(expectedMessage(HL7_MESSAGE, HL7_ACK), createException(HL7_MESSAGE, HL7_ACK).getMessage());
+    }
+
+    @Test
+    public void testNullHl7Message() throws Exception {
+        System.setProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "true");
+
+        assertEquals(expectedMessage(null, HL7_ACK), createException(null, HL7_ACK).getMessage());
+    }
+
+    @Test
+    public void testNullHl7Acknowledgement() throws Exception {
+        System.setProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "true");
+
+        assertEquals(expectedMessage(HL7_MESSAGE, null), createException(HL7_MESSAGE, null).getMessage());
+    }
+
+    @Test
+    public void testNullHl7Payloads() throws Exception {
+        System.setProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "true");
+
+        assertEquals(expectedMessage(null, null), createException(null, null).getMessage());
+    }
+
+
+    // Utility methods
+    private Exception createException(String hl7Message, String hl7Acknowledgment) {
+        byte[] hl7MessageBytes = null;
+        byte[] hl7AcknowledgementBytes = null;
+
+        if (hl7Message != null) {
+            hl7MessageBytes = hl7Message.getBytes();
+        }
+
+        if (hl7Acknowledgment != null) {
+            hl7AcknowledgementBytes = hl7Acknowledgment.getBytes();
+        }
+        return new MllpException(EXCEPTION_MESSAGE, hl7MessageBytes, hl7AcknowledgementBytes);
+    }
+
+    private String expectedMessage(String hl7Message, String hl7Acknowledgment) {
+        final String exceptionMessageFormat = EXCEPTION_MESSAGE + " \n\t{hl7Message= %s} \n\t{hl7Acknowledgement= %s}";
+
+        String formattedHl7Message = null;
+        String formattedHl7Acknowledgement = null;
+
+        if (hl7Message != null) {
+            formattedHl7Message = hl7Message.replaceAll("\r", "<CR>").replaceAll("\n", "<LF>");
+        }
+
+        if (hl7Acknowledgment != null) {
+            formattedHl7Acknowledgement = hl7Acknowledgment.replaceAll("\r", "<CR>").replaceAll("\n", "<LF>");
+        }
+
+        return String.format(exceptionMessageFormat, formattedHl7Message, formattedHl7Acknowledgement);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpFrameExceptionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpFrameExceptionTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpFrameExceptionTest.java
deleted file mode 100644
index ca713dc..0000000
--- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpFrameExceptionTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.mllp;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class MllpFrameExceptionTest {
-    static final String EXCEPTION_MESSAGE = "Test Frame Exception";
-
-    static final String HL7_MESSAGE =
-            "MSH|^~\\&|APP_A|FAC_A|^org^sys||||ADT^A04^ADT_A04|||2.6" + '\r'
-            + "PID|1||1100832^^^^PI||TEST^FIG||98765432|U||R|435 MAIN STREET^^LONGMONT^CO^80503||123-456-7890|||S" + '\r'
-            + '\r' + '\n';
-
-    static final String EXCEPTION_MESSAGE_WITH_LOG_PHI_DISABLED = EXCEPTION_MESSAGE;
-    static final String EXCEPTION_MESSAGE_WITH_LOG_PHI_ENABLED =
-            String.format(String.format("%s:\n\tMLLP Payload: %s",
-                    EXCEPTION_MESSAGE,
-                    new String(HL7_MESSAGE).replaceAll("\r", "<CR>").replaceAll("\n", "<LF>"))
-            );
-
-    Exception exception;
-
-    @Before
-    public void setUp() throws Exception {
-        exception = new MllpFrameException(EXCEPTION_MESSAGE, HL7_MESSAGE.getBytes());
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        System.clearProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY);
-    }
-
-    @Test
-    public void testLogPhiDefault() throws Exception {
-        String exceptionMessage = exception.getMessage();
-
-        assertEquals(EXCEPTION_MESSAGE_WITH_LOG_PHI_ENABLED, exceptionMessage);
-    }
-
-    @Test
-    public void testLogPhiDisabled() throws Exception {
-        System.setProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "false");
-
-        String exceptionMessage = exception.getMessage();
-
-        assertEquals(EXCEPTION_MESSAGE_WITH_LOG_PHI_DISABLED, exceptionMessage);
-    }
-
-    @Test
-    public void testLogPhiEnabled() throws Exception {
-        System.setProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "true");
-
-        String exceptionMessage = exception.getMessage();
-
-        assertEquals(EXCEPTION_MESSAGE_WITH_LOG_PHI_ENABLED, exceptionMessage);
-    }
-
-    @Test
-    public void testNullPayload() throws Exception {
-        final String expectedMessage = String.format("%s:\n\tMLLP Payload: null", EXCEPTION_MESSAGE);
-
-        exception = new MllpFrameException(EXCEPTION_MESSAGE, null);
-
-        System.setProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "true");
-        String exceptionMessage = exception.getMessage();
-
-        assertEquals(expectedMessage, exceptionMessage);
-    }
-    @Test
-    public void testToString() throws Exception {
-        final String expectedString =
-                "org.apache.camel.component.mllp.MllpFrameException: "
-                        + "{mllpPayload="
-                        +      "MSH|^~\\&|APP_A|FAC_A|^org^sys||||ADT^A04^ADT_A04|||2.6<CR>"
-                        +      "PID|1||1100832^^^^PI||TEST^FIG||98765432|U||R|435 MAIN STREET^^LONGMONT^CO^80503||123-456-7890|||S<CR><CR><LF>"
-                        + "}";
-
-        assertEquals(expectedString, exception.toString());
-    }
-
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpProducerConsumerLoopbackTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpProducerConsumerLoopbackTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpProducerConsumerLoopbackTest.java
index 116e67c..e090a50 100644
--- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpProducerConsumerLoopbackTest.java
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpProducerConsumerLoopbackTest.java
@@ -64,19 +64,16 @@ public class MllpProducerConsumerLoopbackTest extends CamelTestSupport {
     @Override
     protected RouteBuilder[] createRouteBuilders() throws Exception {
         RouteBuilder[] builders = new RouteBuilder[2];
-        final int groupInterval = 1000;
-        final boolean groupActiveOnly = false;
 
         builders[0] = new RouteBuilder() {
             String routeId = "mllp-receiver";
 
             public void configure() {
-                fromF("mllp://%s:%d?autoAck=true", mllpHost, mllpPort)
+                fromF("mllp://%s:%d?autoAck=true&readTimeout=1000", mllpHost, mllpPort)
                         .convertBodyTo(String.class)
                         .to(acknowledged)
                         .process(new PassthroughProcessor("after send to result"))
-                        .log(LoggingLevel.DEBUG, routeId, "Receiving: ${body}")
-                        .toF("log://%s?level=INFO&groupInterval=%d&groupActiveOnly=%b", routeId, groupInterval, groupActiveOnly);
+                        .log(LoggingLevel.INFO, routeId, "Receiving: ${body}");
             }
         };
 
@@ -85,10 +82,9 @@ public class MllpProducerConsumerLoopbackTest extends CamelTestSupport {
 
             public void configure() {
                 from(source.getDefaultEndpoint()).routeId(routeId)
-                        .log(LoggingLevel.DEBUG, routeId, "Sending: ${body}")
-                        .toF("mllp://%s:%d", mllpHost, mllpPort)
-                        .setBody(header(MllpConstants.MLLP_ACKNOWLEDGEMENT))
-                        .toF("log://%s?level=INFO&groupInterval=%d&groupActiveOnly=%b", routeId, groupInterval, groupActiveOnly);
+                        .log(LoggingLevel.INFO, routeId, "Sending: ${body}")
+                        .toF("mllp://%s:%d?readTimeout=5000", mllpHost, mllpPort)
+                        .setBody(header(MllpConstants.MLLP_ACKNOWLEDGEMENT));
             }
         };
 
@@ -107,7 +103,7 @@ public class MllpProducerConsumerLoopbackTest extends CamelTestSupport {
     }
 
     @Test
-    public void testLoopbackMultipleMessages() throws Exception {
+    public void testLoopbackWithMultipleMessages() throws Exception {
         int messageCount = 1000;
         acknowledged.expectedMessageCount(messageCount);
 

http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientConsumerBlueprintTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientConsumerBlueprintTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientConsumerBlueprintTest.java
deleted file mode 100644
index e4ca285..0000000
--- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientConsumerBlueprintTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.mllp;
-
-import org.apache.camel.EndpointInject;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.test.blueprint.CamelBlueprintTestSupport;
-import org.junit.Ignore;
-
-@Ignore(value = "Not Yet Implemented")
-// TODO: Implement this
-public class MllpTcpClientConsumerBlueprintTest extends CamelBlueprintTestSupport {
-    @EndpointInject(uri = "mock://target")
-    MockEndpoint target;
-
-    @Override
-    protected String getBlueprintDescriptor() {
-        return "OSGI-INF/blueprint/mllp-tcp-client-consumer.xml";
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerAcknowledgementTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerAcknowledgementTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerAcknowledgementTest.java
index 125df76..f6df61c 100644
--- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerAcknowledgementTest.java
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerAcknowledgementTest.java
@@ -28,12 +28,46 @@ import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.test.AvailablePortFinder;
 import org.apache.camel.test.junit.rule.mllp.MllpServerResource;
 import org.apache.camel.test.junit4.CamelTestSupport;
+
 import org.junit.Rule;
 import org.junit.Test;
 
-import static org.apache.camel.test.mllp.Hl7MessageGenerator.generateMessage;
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_BLOCK;
+import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK;
 
 public class MllpTcpClientProducerAcknowledgementTest extends CamelTestSupport {
+    static final String TEST_MESSAGE =
+        "MSH|^~\\&|ADT|EPIC|JCAPS|CC|20161206193919|RISTECH|ADT^A08|00001|D|2.3^^|||||||" + '\r'
+            + "EVN|A08|20150107161440||REG_UPDATE_SEND_VISIT_MESSAGES_ON_PATIENT_CHANGES|RISTECH^RADIOLOGY^TECHNOLOGIST^^^^^^UCLA^^^^^RRMC||" + '\r'
+            + "PID|1|2100355^^^MRN^MRN|2100355^^^MRN^MRN||MDCLS9^MC9||19700109|F||U|111 HOVER STREET^^LOS ANGELES^CA^90032^USA^P^^LOS ANGELE|LOS ANGELE|"
+                + "(310)725-6952^P^PH^^^310^7256952||ENGLISH|U||60000013647|565-33-2222|||U||||||||N||" + '\r'
+            + "PD1|||UCLA HEALTH SYSTEM^^10|10002116^ADAMS^JOHN^D^^^^^EPIC^^^^PROVID||||||||||||||" + '\r'
+            + "NK1|1|DOE^MC9^^|OTH|^^^^^USA|(310)888-9999^^^^^310^8889999|(310)999-2222^^^^^310^9992222|Emergency Contact 1|||||||||||||||||||||||||||" + '\r'
+            + "PV1|1|OUTPATIENT|RR CT^^^1000^^^^^^^DEPID|EL|||017511^TOBIAS^JONATHAN^^^^^^EPIC^^^^PROVID|017511^TOBIAS^JONATHAN^^^^^^EPIC^^^^PROVID||||||"
+                + "CLR|||||60000013647|SELF|||||||||||||||||||||HOV_CONF|^^^1000^^^^^^^||20150107161438||||||||||" + '\r'
+            + "PV2||||||||20150107161438||||CT BRAIN W WO CONTRAST||||||||||N|||||||||||||||||||||||||||" + '\r'
+            + "ZPV||||||||||||20150107161438|||||||||" + '\r'
+            + "AL1|1||33361^NO KNOWN ALLERGIES^^NOTCOMPUTRITION^NO KNOWN ALLERGIES^EXTELG||||||" + '\r'
+            + "DG1|1|DX|784.0^Headache^DX|Headache||VISIT" + '\r'
+            + "GT1|1|1000235129|MDCLS9^MC9^^||111 HOVER STREET^^LOS ANGELES^CA^90032^USA^^^LOS ANGELE|(310)725-6952^^^^^310^7256952||19700109|F|P/F|SLF|"
+                + "565-33-2222|||||^^^^^USA|||UNKNOWN|||||||||||||||||||||||||||||" + '\r'
+            + "UB2||||||||" + '\r'
+            + '\n';
+
+    static final String EXPECTED_AA =
+        "MSH|^~\\&|JCAPS|CC|ADT|EPIC|20161206193919|RISTECH|ACK^A08|00001|D|2.3^^|||||||" + '\r'
+            + "MSA|AA|00001|" + '\r'
+            + '\n';
+
+    static final String EXPECTED_AR =
+        "MSH|^~\\&|JCAPS|CC|ADT|EPIC|20161206193919|RISTECH|ACK^A08|00001|D|2.3^^|||||||" + '\r'
+            + "MSA|AR|00001|" + '\r'
+            + '\n';
+
+    static final String EXPECTED_AE =
+        "MSH|^~\\&|JCAPS|CC|ADT|EPIC|20161206193919|RISTECH|ACK^A08|00001|D|2.3^^|||||||" + '\r'
+            + "MSA|AE|00001|" + '\r'
+            + '\n';
 
     @Rule
     public MllpServerResource mllpServer = new MllpServerResource("localhost", AvailablePortFinder.getNextAvailable());
@@ -41,15 +75,18 @@ public class MllpTcpClientProducerAcknowledgementTest extends CamelTestSupport {
     @EndpointInject(uri = "direct://source")
     ProducerTemplate source;
 
-    @EndpointInject(uri = "mock://complete")
-    MockEndpoint complete;
+    @EndpointInject(uri = "mock://failed")
+    MockEndpoint failed;
 
     @EndpointInject(uri = "mock://aa-ack")
-    MockEndpoint accept;
+    MockEndpoint aa;
     @EndpointInject(uri = "mock://ae-nack")
-    MockEndpoint error;
+    MockEndpoint ae;
     @EndpointInject(uri = "mock://ar-nack")
-    MockEndpoint reject;
+    MockEndpoint ar;
+
+    @EndpointInject(uri = "mock://invalid-ack")
+    MockEndpoint invalid;
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
@@ -61,7 +98,6 @@ public class MllpTcpClientProducerAcknowledgementTest extends CamelTestSupport {
         return context;
     }
 
-
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
@@ -70,66 +106,181 @@ public class MllpTcpClientProducerAcknowledgementTest extends CamelTestSupport {
             public void configure() {
                 onException(MllpApplicationRejectAcknowledgementException.class)
                         .handled(true)
-                        .to(reject)
+                        .to(ar)
                         .log(LoggingLevel.ERROR, routeId, "AR Acknowledgement");
 
                 onException(MllpApplicationErrorAcknowledgementException.class)
                         .handled(true)
-                        .to(error)
+                        .to(ae)
                         .log(LoggingLevel.ERROR, routeId, "AE Acknowledgement");
 
+                onException(MllpInvalidAcknowledgementException.class)
+                        .handled(true)
+                        .to(invalid)
+                        .log(LoggingLevel.ERROR, routeId, "Invalid Acknowledgement");
+
                 onCompletion()
-                        .onCompleteOnly()
-                        .to(complete)
-                        .log(LoggingLevel.DEBUG, routeId, "AA Acknowledgement");
+                        .onFailureOnly()
+                        .to(failed)
+                        .log(LoggingLevel.DEBUG, routeId, "Exchange failed");
 
                 from(source.getDefaultEndpoint()).routeId(routeId)
                         .log(LoggingLevel.INFO, routeId, "Sending Message")
                         .toF("mllp://%s:%d", mllpServer.getListenHost(), mllpServer.getListenPort())
                         .log(LoggingLevel.INFO, routeId, "Received Acknowledgement")
-                        .to(accept);
+                        .to(aa);
             }
         };
     }
 
     @Test
     public void testApplicationAcceptAcknowledgement() throws Exception {
-        complete.setExpectedMessageCount(1);
-        accept.setExpectedMessageCount(1);
-        reject.setExpectedMessageCount(0);
-        error.setExpectedMessageCount(0);
+        aa.expectedBodiesReceived(TEST_MESSAGE);
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT, EXPECTED_AA.getBytes());
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, EXPECTED_AA);
+
+        failed.expectedMessageCount(0);
+        failed.setAssertPeriod(1000);
+
+        ae.expectedMessageCount(0);
+        ar.expectedMessageCount(0);
+        invalid.expectedMessageCount(0);
 
-        source.sendBody(generateMessage());
+        source.sendBody(TEST_MESSAGE);
 
         assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
     }
 
     @Test
     public void testApplicationRejectAcknowledgement() throws Exception {
-        complete.setExpectedMessageCount(1);
-        accept.setExpectedMessageCount(0);
-        reject.setExpectedMessageCount(1);
-        error.setExpectedMessageCount(0);
+        ar.expectedBodiesReceived(TEST_MESSAGE);
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, "AR");
+        ar.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT, EXPECTED_AR.getBytes());
+        ar.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, EXPECTED_AR);
+
+        failed.expectedMessageCount(0);
+        failed.setAssertPeriod(1000);
+
+        aa.expectedMessageCount(0);
+        ae.expectedMessageCount(0);
+        invalid.expectedMessageCount(0);
 
         mllpServer.setSendApplicationRejectAcknowledgementModulus(1);
 
-        source.sendBody(generateMessage());
+        source.sendBody(TEST_MESSAGE);
 
         assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
     }
 
     @Test
     public void testApplicationErrorAcknowledgement() throws Exception {
-        complete.setExpectedMessageCount(1);
-        accept.setExpectedMessageCount(0);
-        reject.setExpectedMessageCount(0);
-        error.setExpectedMessageCount(1);
+        ae.expectedBodiesReceived(TEST_MESSAGE);
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, "AE");
+        ae.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT, EXPECTED_AE.getBytes());
+        ae.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, EXPECTED_AE);
+
+        failed.expectedMessageCount(0);
+        failed.setAssertPeriod(1000);
+
+        aa.expectedMessageCount(0);
+        ar.expectedMessageCount(0);
+        invalid.expectedMessageCount(0);
 
         mllpServer.setSendApplicationErrorAcknowledgementModulus(1);
 
-        source.sendBody(generateMessage());
+        source.sendBody(TEST_MESSAGE);
+
+        assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testEmptyAcknowledgement() throws Exception {
+        aa.expectedBodiesReceived(TEST_MESSAGE);
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, "");
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT, "".getBytes());
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, "");
+
+        failed.expectedMessageCount(0);
+        failed.setAssertPeriod(1000);
+
+        ar.expectedMessageCount(0);
+        ae.expectedMessageCount(0);
+        invalid.expectedMessageCount(0);
+
+        mllpServer.setExcludeAcknowledgementModulus(1);
+
+        source.sendBody(TEST_MESSAGE);
+
+        assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testInvalidAcknowledgement() throws Exception {
+        final String badAcknowledgement = "A VERY BAD ACKNOWLEDGEMENT";
+
+        aa.expectedBodiesReceived(TEST_MESSAGE);
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, "");
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, badAcknowledgement.getBytes());
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, badAcknowledgement);
+
+        failed.expectedMessageCount(0);
+        failed.setAssertPeriod(1000);
+
+        ar.expectedMessageCount(0);
+        ae.expectedMessageCount(0);
+        invalid.expectedMessageCount(0);
+
+        mllpServer.setAcknowledgementString(badAcknowledgement);
+
+        source.sendBody(TEST_MESSAGE);
+
+        assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testInvalidAcknowledgementContainingEmbeddedStartOfBlock() throws Exception {
+        final String badAcknowledgement = EXPECTED_AA.replaceFirst("RISTECH", "RISTECH" + START_OF_BLOCK);
+
+        aa.expectedBodiesReceived(TEST_MESSAGE);
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, badAcknowledgement.getBytes());
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, badAcknowledgement);
+
+        failed.expectedMessageCount(0);
+        failed.setAssertPeriod(1000);
+
+        ar.expectedMessageCount(0);
+        ae.expectedMessageCount(0);
+        invalid.expectedMessageCount(0);
+
+        mllpServer.setAcknowledgementString(badAcknowledgement);
+
+        source.sendBody(TEST_MESSAGE);
 
         assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
     }
 
+    @Test
+    public void testInvalidAcknowledgementContainingEmbeddedEndOfBlock() throws Exception {
+        final String badAcknowledgement = EXPECTED_AA.replaceFirst("RISTECH", "RISTECH" + END_OF_BLOCK);
+
+        aa.expectedBodiesReceived(TEST_MESSAGE);
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, badAcknowledgement.getBytes());
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, badAcknowledgement);
+
+        failed.expectedMessageCount(0);
+        failed.setAssertPeriod(1000);
+
+        ar.expectedMessageCount(0);
+        ae.expectedMessageCount(0);
+        invalid.expectedMessageCount(0);
+
+        mllpServer.setAcknowledgementString(badAcknowledgement);
+
+        source.sendBody(TEST_MESSAGE);
+
+        assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerAcknowledgementValidationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerAcknowledgementValidationTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerAcknowledgementValidationTest.java
new file mode 100644
index 0000000..1ff641c
--- /dev/null
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerAcknowledgementValidationTest.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit.rule.mllp.MllpServerResource;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_BLOCK;
+import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK;
+
+public class MllpTcpClientProducerAcknowledgementValidationTest extends CamelTestSupport {
+    static final String TEST_MESSAGE =
+        "MSH|^~\\&|ADT|EPIC|JCAPS|CC|20161206193919|RISTECH|ADT^A08|00001|D|2.3^^|||||||" + '\r'
+            + "EVN|A08|20150107161440||REG_UPDATE_SEND_VISIT_MESSAGES_ON_PATIENT_CHANGES|RISTECH^RADIOLOGY^TECHNOLOGIST^^^^^^UCLA^^^^^RRMC||" + '\r'
+            + "PID|1|2100355^^^MRN^MRN|2100355^^^MRN^MRN||MDCLS9^MC9||19700109|F||U|111 HOVER STREET^^LOS ANGELES^CA^90032^USA^P^^LOS ANGELE|LOS ANGELE|"
+                + "(310)725-6952^P^PH^^^310^7256952||ENGLISH|U||60000013647|565-33-2222|||U||||||||N||" + '\r'
+            + "PD1|||UCLA HEALTH SYSTEM^^10|10002116^ADAMS^JOHN^D^^^^^EPIC^^^^PROVID||||||||||||||" + '\r'
+            + "NK1|1|DOE^MC9^^|OTH|^^^^^USA|(310)888-9999^^^^^310^8889999|(310)999-2222^^^^^310^9992222|Emergency Contact 1|||||||||||||||||||||||||||" + '\r'
+            + "PV1|1|OUTPATIENT|RR CT^^^1000^^^^^^^DEPID|EL|||017511^TOBIAS^JONATHAN^^^^^^EPIC^^^^PROVID|017511^TOBIAS^JONATHAN^^^^^^EPIC^^^^PROVID||||||"
+                + "CLR|||||60000013647|SELF|||||||||||||||||||||HOV_CONF|^^^1000^^^^^^^||20150107161438||||||||||" + '\r'
+            + "PV2||||||||20150107161438||||CT BRAIN W WO CONTRAST||||||||||N|||||||||||||||||||||||||||" + '\r'
+            + "ZPV||||||||||||20150107161438|||||||||" + '\r'
+            + "AL1|1||33361^NO KNOWN ALLERGIES^^NOTCOMPUTRITION^NO KNOWN ALLERGIES^EXTELG||||||" + '\r'
+            + "DG1|1|DX|784.0^Headache^DX|Headache||VISIT" + '\r'
+            + "GT1|1|1000235129|MDCLS9^MC9^^||111 HOVER STREET^^LOS ANGELES^CA^90032^USA^^^LOS ANGELE|(310)725-6952^^^^^310^7256952||19700109|F|P/F|SLF|"
+                + "565-33-2222|||||^^^^^USA|||UNKNOWN|||||||||||||||||||||||||||||" + '\r'
+            + "UB2||||||||" + '\r'
+            + '\n';
+
+    static final String EXPECTED_AA =
+        "MSH|^~\\&|JCAPS|CC|ADT|EPIC|20161206193919|RISTECH|ACK^A08|00001|D|2.3^^|||||||" + '\r'
+            + "MSA|AA|00001|" + '\r'
+            + '\n';
+
+    static final String EXPECTED_AR =
+        "MSH|^~\\&|JCAPS|CC|ADT|EPIC|20161206193919|RISTECH|ACK^A08|00001|D|2.3^^|||||||" + '\r'
+            + "MSA|AR|00001|" + '\r'
+            + '\n';
+
+    static final String EXPECTED_AE =
+        "MSH|^~\\&|JCAPS|CC|ADT|EPIC|20161206193919|RISTECH|ACK^A08|00001|D|2.3^^|||||||" + '\r'
+            + "MSA|AE|00001|" + '\r'
+            + '\n';
+
+    @Rule
+    public MllpServerResource mllpServer = new MllpServerResource("localhost", AvailablePortFinder.getNextAvailable());
+
+    @EndpointInject(uri = "direct://source")
+    ProducerTemplate source;
+
+    @EndpointInject(uri = "mock://failed")
+    MockEndpoint failed;
+
+    @EndpointInject(uri = "mock://aa-ack")
+    MockEndpoint aa;
+    @EndpointInject(uri = "mock://ae-nack")
+    MockEndpoint ae;
+    @EndpointInject(uri = "mock://ar-nack")
+    MockEndpoint ar;
+
+    @EndpointInject(uri = "mock://invalid-ack")
+    MockEndpoint invalid;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        DefaultCamelContext context = (DefaultCamelContext) super.createCamelContext();
+
+        context.setUseMDCLogging(true);
+        context.setName(this.getClass().getSimpleName());
+
+        return context;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            String routeId = "mllp-sender";
+
+            public void configure() {
+                onException(MllpApplicationRejectAcknowledgementException.class)
+                        .handled(true)
+                        .to(ar)
+                        .log(LoggingLevel.ERROR, routeId, "AR Acknowledgement");
+
+                onException(MllpApplicationErrorAcknowledgementException.class)
+                        .handled(true)
+                        .to(ae)
+                        .log(LoggingLevel.ERROR, routeId, "AE Acknowledgement");
+
+                onException(MllpInvalidAcknowledgementException.class)
+                        .handled(true)
+                        .to(invalid)
+                        .log(LoggingLevel.ERROR, routeId, "Invalid Acknowledgement");
+
+                onCompletion()
+                        .onFailureOnly()
+                        .to(failed)
+                        .log(LoggingLevel.DEBUG, routeId, "Exchange failed");
+
+                from(source.getDefaultEndpoint()).routeId(routeId)
+                        .log(LoggingLevel.INFO, routeId, "Sending Message")
+                        .toF("mllp://%s:%d?validatePayload=true", mllpServer.getListenHost(), mllpServer.getListenPort())
+                        .log(LoggingLevel.INFO, routeId, "Received Acknowledgement")
+                        .to(aa);
+            }
+        };
+    }
+
+    @Test
+    public void testApplicationAcceptAcknowledgement() throws Exception {
+        aa.expectedBodiesReceived(TEST_MESSAGE);
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT, EXPECTED_AA.getBytes());
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, EXPECTED_AA);
+
+        failed.expectedMessageCount(0);
+        failed.setAssertPeriod(1000);
+
+        ae.expectedMessageCount(0);
+        ar.expectedMessageCount(0);
+        invalid.expectedMessageCount(0);
+
+        source.sendBody(TEST_MESSAGE);
+
+        assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testApplicationRejectAcknowledgement() throws Exception {
+        ar.expectedBodiesReceived(TEST_MESSAGE);
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, "AR");
+        ar.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT, EXPECTED_AR.getBytes());
+        ar.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, EXPECTED_AR);
+
+        failed.expectedMessageCount(0);
+        failed.setAssertPeriod(1000);
+
+        aa.expectedMessageCount(0);
+        ae.expectedMessageCount(0);
+        invalid.expectedMessageCount(0);
+
+        mllpServer.setSendApplicationRejectAcknowledgementModulus(1);
+
+        source.sendBody(TEST_MESSAGE);
+
+        assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testApplicationErrorAcknowledgement() throws Exception {
+        ae.expectedBodiesReceived(TEST_MESSAGE);
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, "AE");
+        ae.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT, EXPECTED_AE.getBytes());
+        ae.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, EXPECTED_AE);
+
+        failed.expectedMessageCount(0);
+        failed.setAssertPeriod(1000);
+
+        aa.expectedMessageCount(0);
+        ar.expectedMessageCount(0);
+        invalid.expectedMessageCount(0);
+
+        mllpServer.setSendApplicationErrorAcknowledgementModulus(1);
+
+        source.sendBody(TEST_MESSAGE);
+
+        assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testEmptyAcknowledgement() throws Exception {
+        invalid.expectedBodiesReceived(TEST_MESSAGE);
+        invalid.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT, "".getBytes());
+        invalid.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, "");
+
+        failed.expectedMessageCount(0);
+        failed.setAssertPeriod(1000);
+
+        aa.expectedMessageCount(0);
+        ae.expectedMessageCount(0);
+        ar.expectedMessageCount(0);
+
+        mllpServer.setExcludeAcknowledgementModulus(1);
+
+        source.sendBody(TEST_MESSAGE);
+
+        assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testInvalidAcknowledgement() throws Exception {
+        final String badAcknowledgement = "A VERY BAD ACKNOWLEDGEMENT";
+
+        invalid.expectedBodiesReceived(TEST_MESSAGE);
+        invalid.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, badAcknowledgement.getBytes());
+        invalid.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, badAcknowledgement);
+
+        failed.expectedMessageCount(0);
+        failed.setAssertPeriod(1000);
+
+        aa.expectedMessageCount(0);
+        ae.expectedMessageCount(0);
+        ar.expectedMessageCount(0);
+
+        mllpServer.setAcknowledgementString(badAcknowledgement);
+
+        source.sendBody(TEST_MESSAGE);
+
+        assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testInvalidAcknowledgementContainingEmbeddedStartOfBlock() throws Exception {
+        final String badAcknowledgement = EXPECTED_AA.replaceFirst("RISTECH", "RISTECH" + START_OF_BLOCK);
+
+        invalid.expectedBodiesReceived(TEST_MESSAGE);
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
+        invalid.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, badAcknowledgement.getBytes());
+        invalid.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, badAcknowledgement);
+
+        failed.expectedMessageCount(0);
+        failed.setAssertPeriod(1000);
+
+        aa.expectedMessageCount(0);
+        ae.expectedMessageCount(0);
+        ar.expectedMessageCount(0);
+
+        mllpServer.setAcknowledgementString(badAcknowledgement);
+
+        source.sendBody(TEST_MESSAGE);
+
+        assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testInvalidAcknowledgementContainingEmbeddedEndOfBlock() throws Exception {
+        final String badAcknowledgement = EXPECTED_AA.replaceFirst("RISTECH", "RISTECH" + END_OF_BLOCK);
+
+        invalid.expectedBodiesReceived(TEST_MESSAGE);
+        aa.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
+        invalid.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, badAcknowledgement.getBytes());
+        invalid.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, badAcknowledgement);
+
+        failed.expectedMessageCount(0);
+        failed.setAssertPeriod(1000);
+
+        aa.expectedMessageCount(0);
+        ae.expectedMessageCount(0);
+        ar.expectedMessageCount(0);
+
+        mllpServer.setAcknowledgementString(badAcknowledgement);
+
+        source.sendBody(TEST_MESSAGE);
+
+        assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java
index 9b10a14..6d308bf 100644
--- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java
@@ -41,10 +41,10 @@ public class MllpTcpClientProducerBlueprintTest extends CamelBlueprintTestSuppor
 
     final String sourceUri = "direct://source";
     final String mockAcknowledgedUri = "mock://acknowledged";
-    final String mockTimeoutUri = "mock://timeout-ex";
+    final String mockTimeoutUri = "mock://timeoutError-ex";
     final String mockAeExUri = "mock://ae-ack";
     final String mockArExUri = "mock://ar-ack";
-    final String mockFrameExUri = "mock://frame-ex";
+    final String mockFrameExUri = "mock://frameError-ex";
 
     @EndpointInject(uri = sourceUri)
     ProducerTemplate source;
@@ -101,11 +101,11 @@ public class MllpTcpClientProducerBlueprintTest extends CamelBlueprintTestSuppor
     @Test()
     public void testSendMultipleMessages() throws Exception {
         int messageCount = 500;
-        acknowledged.setExpectedMessageCount(messageCount);
-        timeout.setExpectedMessageCount(0);
-        frame.setExpectedMessageCount(0);
-        ae.setExpectedMessageCount(0);
-        ar.setExpectedMessageCount(0);
+        acknowledged.expectedMessageCount(messageCount);
+        timeout.expectedMessageCount(0);
+        frame.expectedMessageCount(0);
+        ae.expectedMessageCount(0);
+        ar.expectedMessageCount(0);
 
         // Uncomment one of these lines to see the NACKs handled
         // mllpServer.setSendApplicationRejectAcknowledgementModulus(10);

http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerConnectionErrorTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerConnectionErrorTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerConnectionErrorTest.java
new file mode 100644
index 0000000..897b74b
--- /dev/null
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerConnectionErrorTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit.rule.mllp.MllpServerResource;
+import org.apache.camel.test.junit4.CamelTestSupport;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.apache.camel.test.mllp.Hl7MessageGenerator.generateMessage;
+
+public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport {
+    @Rule
+    public MllpServerResource mllpServer = new MllpServerResource("localhost", AvailablePortFinder.getNextAvailable());
+
+    @EndpointInject(uri = "direct://source")
+    ProducerTemplate source;
+
+    @EndpointInject(uri = "mock://complete")
+    MockEndpoint complete;
+
+    @EndpointInject(uri = "mock://write-ex")
+    MockEndpoint writeEx;
+
+    @EndpointInject(uri = "mock://receive-ex")
+    MockEndpoint receiveEx;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        DefaultCamelContext context = (DefaultCamelContext) super.createCamelContext();
+
+        context.setUseMDCLogging(true);
+        context.setName(this.getClass().getSimpleName());
+
+        return context;
+    }
+
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            String routeId = "mllp-sender";
+
+            public void configure() {
+                onException(MllpWriteException.class)
+                        .handled(true)
+                        .to(writeEx)
+                        .log(LoggingLevel.ERROR, routeId, "Write Error")
+                        .stop();
+
+                onException(MllpReceiveAcknowledgementException.class)
+                        .handled(true)
+                        .to(receiveEx)
+                        .log(LoggingLevel.ERROR, routeId, "Receive Error")
+                        .stop();
+
+                from(source.getDefaultEndpoint()).routeId(routeId)
+                        .log(LoggingLevel.INFO, routeId, "Sending Message")
+                        .toF("mllp://%s:%d", mllpServer.getListenHost(), mllpServer.getListenPort())
+                        .log(LoggingLevel.INFO, routeId, "Received Acknowledgement")
+                        .to(complete);
+            }
+        };
+    }
+
+    @Test
+    public void testConnectionClosedBeforeSendingHL7Message() throws Exception {
+        complete.expectedMessageCount(1);
+        writeEx.expectedMessageCount(0);
+        receiveEx.expectedMessageCount(1);
+
+        NotifyBuilder done = new NotifyBuilder(context).whenCompleted(2).create();
+
+        // Need to send one message to get the connection established
+        source.sendBody(generateMessage());
+
+        mllpServer.closeClientConnections();
+        source.sendBody(generateMessage());
+
+        assertTrue("Should have completed an exchange", done.matches(5, TimeUnit.SECONDS));
+
+        assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
+    }
+
+    @Test()
+    public void testConnectionResetBeforeSendingHL7Message() throws Exception {
+        complete.expectedMessageCount(1);
+        writeEx.expectedMessageCount(1);
+        receiveEx.expectedMessageCount(0);
+
+        NotifyBuilder done = new NotifyBuilder(context).whenCompleted(2).create();
+
+        // Need to send one message to get the connection established
+        source.sendBody(generateMessage());
+
+        mllpServer.resetClientConnections();
+
+        source.sendBody(generateMessage());
+
+        assertTrue("Should have completed an exchange", done.matches(5, TimeUnit.SECONDS));
+
+        assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
+    }
+
+    @Test()
+    public void testConnectionClosedBeforeReadingAcknowledgement() throws Exception {
+        complete.expectedMessageCount(0);
+        writeEx.expectedMessageCount(0);
+        receiveEx.expectedMessageCount(1);
+
+        mllpServer.setCloseSocketBeforeAcknowledgementModulus(1);
+
+        NotifyBuilder done = new NotifyBuilder(context).whenCompleted(1).create();
+
+        source.sendBody(generateMessage());
+
+        assertTrue("Should have completed an exchange", done.matches(5, TimeUnit.SECONDS));
+
+        assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
+    }
+
+    @Test()
+    public void testConnectionResetBeforeReadingAcknowledgement() throws Exception {
+        complete.expectedMessageCount(0);
+        writeEx.expectedMessageCount(0);
+        receiveEx.expectedMessageCount(1);
+
+        mllpServer.setResetSocketBeforeAcknowledgementModulus(1);
+
+        NotifyBuilder done = new NotifyBuilder(context).whenCompleted(1).create();
+
+        source.sendBody(generateMessage());
+
+        assertTrue("Should have completed an exchange", done.matches(5, TimeUnit.SECONDS));
+
+        assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerTest.java
index 6edc48c..0a98daa 100644
--- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerTest.java
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerTest.java
@@ -29,6 +29,7 @@ import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.test.AvailablePortFinder;
 import org.apache.camel.test.junit.rule.mllp.MllpServerResource;
 import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -44,16 +45,8 @@ public class MllpTcpClientProducerTest extends CamelTestSupport {
     @EndpointInject(uri = "mock://acknowledged")
     MockEndpoint acknowledged;
 
-    @EndpointInject(uri = "mock://timeout-ex")
-    MockEndpoint timeout;
-
-    @EndpointInject(uri = "mock://frame-ex")
-    MockEndpoint frame;
-
-    @Override
-    public String isMockEndpoints() {
-        return "log://netty-mllp-sender-throughput*";
-    }
+    @EndpointInject(uri = "mock://timeout-error")
+    MockEndpoint timeoutError;
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
@@ -75,34 +68,26 @@ public class MllpTcpClientProducerTest extends CamelTestSupport {
             public void configure() throws Exception {
                 errorHandler(
                         defaultErrorHandler().allowRedeliveryWhileStopping(false));
-                onException(MllpFrameException.class)
-                        .handled(true)
-                        .logHandled(false)
-                        .to(frame);
-                onException(MllpTimeoutException.class)
+
+                onException(MllpAcknowledgementTimeoutException.class)
                         .handled(true)
                         .logHandled(false)
-                        .to(timeout);
-                onCompletion()
-                        .onFailureOnly().log(LoggingLevel.ERROR, "Processing Failed");
+                        .to(timeoutError);
+
                 from(source.getDefaultEndpoint())
                         .routeId("mllp-sender-test-route")
                         .log(LoggingLevel.INFO, "Sending Message: $simple{header[CamelHL7MessageControl]}")
                         .toF("mllp://%s:%d?connectTimeout=%d&receiveTimeout=%d",
                                 mllpServer.getListenHost(), mllpServer.getListenPort(), connectTimeout, responseTimeout)
                         .to(acknowledged);
-                from("direct://handle-timeout")
-                        .log(LoggingLevel.ERROR, "Response Timeout")
-                        .rollback();
             }
         };
     }
 
     @Test
     public void testSendSingleMessage() throws Exception {
-        acknowledged.setExpectedMessageCount(1);
-        timeout.setExpectedMessageCount(0);
-        frame.setExpectedMessageCount(0);
+        acknowledged.expectedMessageCount(1);
+        timeoutError.expectedMessageCount(0);
 
         source.sendBody(generateMessage());
 
@@ -113,9 +98,8 @@ public class MllpTcpClientProducerTest extends CamelTestSupport {
     @Test
     public void testSendMultipleMessages() throws Exception {
         int messageCount = 5;
-        acknowledged.setExpectedMessageCount(messageCount);
-        timeout.setExpectedMessageCount(0);
-        frame.setExpectedMessageCount(0);
+        acknowledged.expectedMessageCount(messageCount);
+        timeoutError.expectedMessageCount(0);
 
         NotifyBuilder[] complete = new NotifyBuilder[messageCount];
         for (int i = 0; i < messageCount; ++i) {
@@ -134,9 +118,8 @@ public class MllpTcpClientProducerTest extends CamelTestSupport {
     @Test
     public void testNoResponseOnFirstMessage() throws Exception {
         int sendMessageCount = 5;
-        acknowledged.setExpectedMessageCount(sendMessageCount - 1);
-        timeout.expectedMessageCount(1);
-        frame.setExpectedMessageCount(0);
+        acknowledged.expectedMessageCount(sendMessageCount - 1);
+        timeoutError.expectedMessageCount(1);
 
         NotifyBuilder[] complete = new NotifyBuilder[sendMessageCount];
         for (int i = 0; i < sendMessageCount; ++i) {
@@ -161,9 +144,8 @@ public class MllpTcpClientProducerTest extends CamelTestSupport {
     @Test
     public void testNoResponseOnNthMessage() throws Exception {
         int sendMessageCount = 3;
-        acknowledged.setExpectedMessageCount(sendMessageCount - 1);
-        timeout.expectedMessageCount(1);
-        frame.setExpectedMessageCount(0);
+        acknowledged.expectedMessageCount(sendMessageCount - 1);
+        timeoutError.expectedMessageCount(1);
 
         NotifyBuilder[] complete = new NotifyBuilder[sendMessageCount];
         for (int i = 0; i < sendMessageCount; ++i) {
@@ -183,7 +165,8 @@ public class MllpTcpClientProducerTest extends CamelTestSupport {
     @Test
     public void testMissingEndOfDataByte() throws Exception {
         int sendMessageCount = 3;
-        acknowledged.setExpectedMessageCount(sendMessageCount - 1);
+        acknowledged.expectedMessageCount(sendMessageCount - 1);
+        timeoutError.expectedMessageCount(1);
 
         NotifyBuilder[] complete = new NotifyBuilder[sendMessageCount];
         for (int i = 0; i < sendMessageCount; ++i) {
@@ -203,7 +186,8 @@ public class MllpTcpClientProducerTest extends CamelTestSupport {
     @Test
     public void testMissingEndOfBlockByte() throws Exception {
         int sendMessageCount = 3;
-        acknowledged.setExpectedMessageCount(sendMessageCount - 1);
+        acknowledged.expectedMessageCount(sendMessageCount - 1);
+        timeoutError.expectedMessageCount(1);
 
         NotifyBuilder[] complete = new NotifyBuilder[sendMessageCount];
         for (int i = 0; i < sendMessageCount; ++i) {
@@ -221,19 +205,25 @@ public class MllpTcpClientProducerTest extends CamelTestSupport {
     }
 
     @Test
-    public void testApplicationAcceptAcknowledgement() throws Exception {
-        int sendMessageCount = 5;
-        acknowledged.setExpectedMessageCount(sendMessageCount);
+    public void testAcknowledgementReceiveTimeout() throws Exception {
+        acknowledged.expectedMessageCount(0);
+        timeoutError.expectedMessageCount(1);
 
-        NotifyBuilder[] complete = new NotifyBuilder[sendMessageCount];
-        for (int i = 0; i < sendMessageCount; ++i) {
-            complete[i] = new NotifyBuilder(context).whenDone(i + 1).create();
-        }
+        mllpServer.disableResponse(1);
 
-        for (int i = 0; i < sendMessageCount; ++i) {
-            source.sendBody(generateMessage(i + 1));
-            assertTrue("Messege " + i + " not completed", complete[i].matches(1, TimeUnit.SECONDS));
-        }
+        source.sendBody(generateMessage());
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testAcknowledgementReadTimeout() throws Exception {
+        acknowledged.expectedMessageCount(0);
+        timeoutError.expectedMessageCount(1);
+
+        mllpServer.setDelayDuringAcknowledgement(15000);
+
+        source.sendBody(generateMessage());
 
         assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerAcknowledgementTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerAcknowledgementTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerAcknowledgementTest.java
index e2971dd..557d462 100644
--- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerAcknowledgementTest.java
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerAcknowledgementTest.java
@@ -101,7 +101,8 @@ public class MllpTcpServerConsumerAcknowledgementTest extends CamelTestSupport {
 
         final String expectedAcknowledgement =
                 "MSH|^~\\&|^org^sys||APP_A|FAC_A|||ACK^A04^ADT_A04|||2.6" + '\r'
-                        + "MSA|AA|" + '\r' + '\n';
+                        + "MSA|AA|" + '\r'
+                        + '\r' + '\n';
 
         result.expectedBodiesReceived(testMessage);
         result.expectedHeaderReceived(MLLP_SENDING_APPLICATION, "APP_A");
@@ -141,7 +142,7 @@ public class MllpTcpServerConsumerAcknowledgementTest extends CamelTestSupport {
 
         final String expectedAcknowledgement =
                 "MSH|^~\\&|^org^sys||APP_A|FAC_A|||ACK^A04^ADT_A04|||2.6" + '\r'
-                        + "MSA|AA|"
+                        + "MSA|AA|" + '\r'
                         + '\r' + '\n';
 
         result.expectedBodiesReceived(testMessage);

http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java
index ce89b40..239a178 100644
--- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java
@@ -30,6 +30,9 @@ import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Rule;
 import org.junit.Test;
 
+import static org.apache.camel.component.mllp.MllpTcpServerConsumer.SOCKET_STARTUP_TEST_READ_TIMEOUT;
+import static org.apache.camel.component.mllp.MllpTcpServerConsumer.SOCKET_STARTUP_TEST_WAIT;
+
 public class MllpTcpServerConsumerConnectionTest extends CamelTestSupport {
     static final int RECEIVE_TIMEOUT = 500;
 
@@ -83,11 +86,12 @@ public class MllpTcpServerConsumerConnectionTest extends CamelTestSupport {
      * @throws Exception
      */
     @Test
-    public void testConnectWithoutData() throws Exception {
+    public void testConnectThenCloseWithoutData() throws Exception {
         int connectionCount = 10;
         long connectionMillis = 200;
 
         result.setExpectedCount(0);
+        result.setAssertPeriod(SOCKET_STARTUP_TEST_WAIT + SOCKET_STARTUP_TEST_READ_TIMEOUT);
 
         addTestRoute(-1);
 
@@ -97,6 +101,35 @@ public class MllpTcpServerConsumerConnectionTest extends CamelTestSupport {
             mllpClient.close();
         }
 
+        // Connect one more time and allow a client thread to start
+        mllpClient.connect();
+        Thread.sleep(SOCKET_STARTUP_TEST_WAIT + SOCKET_STARTUP_TEST_READ_TIMEOUT + 1000);
+        mllpClient.close();
+
+        assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testConnectThenResetWithoutData() throws Exception {
+        int connectionCount = 10;
+        long connectionMillis = 200;
+
+        result.setExpectedCount(0);
+        result.setAssertPeriod(SOCKET_STARTUP_TEST_WAIT + SOCKET_STARTUP_TEST_READ_TIMEOUT);
+
+        addTestRoute(-1);
+
+        for (int i = 1; i <= connectionCount; ++i) {
+            mllpClient.connect();
+            Thread.sleep(connectionMillis);
+            mllpClient.reset();
+        }
+
+        // Connect one more time and allow a client thread to start
+        mllpClient.connect();
+        Thread.sleep(SOCKET_STARTUP_TEST_WAIT + SOCKET_STARTUP_TEST_READ_TIMEOUT + 1000);
+        mllpClient.reset();
+
         assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
     }
 


Mime
View raw message