camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [3/6] camel git commit: Added camel-mllp component
Date Wed, 30 Dec 2015 09:36:51 GMT
http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
new file mode 100644
index 0000000..362a30f
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
@@ -0,0 +1,550 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.mllp.impl.MllpUtil;
+import org.apache.camel.converter.IOConverter;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.processor.mllp.Hl7AcknowledgementGenerationException;
+import org.apache.camel.processor.mllp.Hl7AcknowledgementGenerator;
+import org.apache.camel.util.IOHelper;
+
+import static org.apache.camel.component.mllp.MllpConstants.*;
+import static org.apache.camel.component.mllp.MllpEndpoint.SEGMENT_DELIMITER;
+import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK;
+
+/**
+ * The MLLP consumer.
+ */
+public class MllpTcpServerConsumer extends DefaultConsumer {
+    ServerSocketThread serverSocketThread;
+
+    List<ClientSocketThread> clientThreads = new LinkedList<>();
+
+    private final MllpEndpoint endpoint;
+
+    public MllpTcpServerConsumer(MllpEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        log.trace("MllpTcpServerConsumer(endpoint, processor)");
+
+
+        this.endpoint = endpoint;
+
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        log.debug("doStart() - creating acceptor thread");
+
+        ServerSocket serverSocket = new ServerSocket();
+        if (null != endpoint.receiveBufferSize) {
+            serverSocket.setReceiveBufferSize(endpoint.receiveBufferSize);
+        }
+
+        serverSocket.setReuseAddress(endpoint.reuseAddress);
+
+        // Accept Timeout
+        serverSocket.setSoTimeout(endpoint.acceptTimeout);
+
+        InetSocketAddress socketAddress = new InetSocketAddress(endpoint.getHostname(), endpoint.getPort());
+        serverSocket.bind(socketAddress, endpoint.backlog);
+
+        serverSocketThread = new ServerSocketThread(serverSocket);
+        serverSocketThread.start();
+
+        super.doStart();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        log.debug("doStop()");
+
+        switch (serverSocketThread.getState()) {
+        case TERMINATED:
+            // This is what we hope for
+            break;
+        case NEW:
+        case RUNNABLE:
+        case BLOCKED:
+        case WAITING:
+        case TIMED_WAITING:
+        default:
+            serverSocketThread.interrupt();
+            break;
+        }
+
+        serverSocketThread = null;
+
+        super.doStop();
+    }
+
+    @Override
+    protected void doSuspend() throws Exception {
+        log.debug("doSuspend()");
+
+        super.doSuspend();
+    }
+
+    @Override
+    protected void doResume() throws Exception {
+        log.debug("doResume()");
+
+        super.doSuspend();
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        log.debug("doShutdown()");
+
+        super.doShutdown();
+    }
+
+
+    /**
+     * Nested Class to handle the ServerSocket.accept requests
+     */
+    class ServerSocketThread extends Thread {
+        ServerSocket serverSocket;
+
+        ServerSocketThread(ServerSocket serverSocket) {
+            this.setName(createThreadName(serverSocket));
+
+            this.serverSocket = serverSocket;
+        }
+
+        /**
+         * Derive a thread name from the class name, the component URI and the connection information.
+         * <p/>
+         * The String will in the format <class name>[endpoint key] - [local socket address]
+         *
+         * @return String for thread name
+         */
+        String createThreadName(ServerSocket serverSocket) {
+            // Get the classname without the package.  This is a nested class, so we want the parent class name included
+            String fullClassName = this.getClass().getName();
+            String className = fullClassName.substring(fullClassName.lastIndexOf('.') + 1);
+
+            // Get the URI without options
+            String fullEndpointKey = endpoint.getEndpointKey();
+            String endpointKey;
+            if (fullEndpointKey.contains("?")) {
+                endpointKey = fullEndpointKey.substring(0, fullEndpointKey.indexOf('?'));
+            } else {
+                endpointKey = fullEndpointKey;
+            }
+
+            // Now put it all together
+            return String.format("%s[%s] - %s", className, endpointKey, serverSocket.getLocalSocketAddress());
+        }
+
+        /**
+         * The main ServerSocket.accept() loop
+         * <p/>
+         * NOTE:  When a connection is received, the Socket is checked after a brief delay in an attempt to determine
+         * if this is a load-balancer probe.  The test is done before the ClientSocketThread is created to avoid creating
+         * a large number of short lived threads, which is what can occur if the load balancer polling interval is very
+         * short.
+         */
+        public void run() {
+            log.debug("Starting acceptor thread");
+
+            while (null != serverSocket && serverSocket.isBound() && !serverSocket.isClosed()) {
+                try {
+                    /* ? set this here ? */
+                    // serverSocket.setSoTimeout( 10000 );
+                    // TODO: Need to check maxConnections and figure out what to do when exceeded
+                    Socket socket = serverSocket.accept();
+
+                    /* Wait a bit and then check and see if the socket is really there - it could be a load balancer
+                     pinging the port
+                      */
+                    Thread.sleep(100);
+                    if (socket.isConnected() && !socket.isClosed()) {
+                        log.debug("Socket appears to be there - check for available data");
+                        InputStream inputStream;
+                        try {
+                            inputStream = socket.getInputStream();
+                        } catch (IOException ioEx) {
+                            // Bad Socket -
+                            log.warn("Failed to retrieve the InputStream for socket after the initial connection was accepted");
+                            MllpUtil.resetConnection(socket);
+                            continue;
+                        }
+
+                        if (0 < inputStream.available()) {
+                            // Something is there - start the client thread
+                            ClientSocketThread clientThread = new ClientSocketThread(socket, null);
+                            clientThreads.add(clientThread);
+                            clientThread.start();
+                            continue;
+                        }
+
+                        // The easy check failed - so trigger a blocking read
+                        socket.setSoTimeout(100);
+                        try {
+                            int tmpByte = inputStream.read();
+                            socket.setSoTimeout(endpoint.responseTimeout);
+                            if (-1 == tmpByte) {
+                                log.debug("Socket.read() returned END_OF_STREAM - resetting connection");
+                                MllpUtil.resetConnection(socket);
+                            } else {
+                                ClientSocketThread clientThread = new ClientSocketThread(socket, tmpByte);
+                                clientThreads.add(clientThread);
+                                clientThread.start();
+                            }
+                        } catch (SocketTimeoutException timeoutEx) {
+                            // No data, but the socket is there
+                            log.debug("No Data - but the socket is there.  Starting ClientSocketThread");
+                            ClientSocketThread clientThread = new ClientSocketThread(socket, null);
+                            clientThreads.add(clientThread);
+                            clientThread.start();
+                        }
+                    }
+                } catch (SocketTimeoutException timeoutEx) {
+                    // No new clients
+                    log.trace("SocketTimeoutException waiting for new connections - no new connections");
+
+                    for (int i = clientThreads.size() - 1; i >= 0; --i) {
+                        ClientSocketThread thread = clientThreads.get(i);
+                        if (!thread.isAlive()) {
+                            clientThreads.remove(i);
+                        }
+                    }
+                } catch (InterruptedException interruptEx) {
+                    log.info("accept loop interrupted - closing ServerSocket");
+                    try {
+                        serverSocket.close();
+                    } catch (Exception ex) {
+                        log.warn("Exception encountered closing ServerSocket after InterruptedException", ex);
+                    }
+                } catch (Exception ex) {
+                    log.error("Exception accepting new connection", ex);
+                }
+            }
+        }
+
+    }
+
+    class ClientSocketThread extends Thread {
+        Socket clientSocket;
+        Hl7AcknowledgementGenerator acknowledgementGenerator = new Hl7AcknowledgementGenerator();
+
+        Integer initialByte;
+
+        ClientSocketThread(Socket clientSocket, Integer initialByte) throws IOException {
+            this.initialByte = initialByte;
+            this.setName(createThreadName(clientSocket));
+            this.clientSocket = clientSocket;
+            this.clientSocket.setKeepAlive(endpoint.keepAlive);
+            this.clientSocket.setTcpNoDelay(endpoint.tcpNoDelay);
+            if (null != endpoint.receiveBufferSize) {
+                this.clientSocket.setReceiveBufferSize(endpoint.receiveBufferSize);
+            }
+            if (null != endpoint.sendBufferSize) {
+                this.clientSocket.setSendBufferSize(endpoint.sendBufferSize);
+            }
+            this.clientSocket.setReuseAddress(endpoint.reuseAddress);
+            this.clientSocket.setSoLinger(false, -1);
+
+            // Read Timeout
+            this.clientSocket.setSoTimeout(endpoint.responseTimeout);
+
+        }
+
+        /**
+         * derive a thread name from the class name, the component URI and the connection information
+         * <p/>
+         * The String will in the format <class name>[endpoint key] - [local socket address] -> [remote socket address]
+         *
+         * @return the thread name
+         */
+        String createThreadName(Socket socket) {
+            // Get the classname without the package.  This is a nested class, so we want the parent class name included
+            String fullClassName = this.getClass().getName();
+            String className = fullClassName.substring(fullClassName.lastIndexOf('.') + 1);
+
+            // Get the URI without options
+            String fullEndpointKey = endpoint.getEndpointKey();
+            String endpointKey;
+            if (fullEndpointKey.contains("?")) {
+                endpointKey = fullEndpointKey.substring(0, fullEndpointKey.indexOf('?'));
+            } else {
+                endpointKey = fullEndpointKey;
+            }
+
+            // Now put it all together
+            return String.format("%s[%s] - %s -> %s", className, endpointKey, socket.getLocalSocketAddress(), socket.getRemoteSocketAddress());
+        }
+
+        @Override
+        public void run() {
+
+            while (null != clientSocket && clientSocket.isConnected() && !clientSocket.isClosed()) {
+                byte[] hl7MessageBytes = null;
+                // Send the message on for processing and wait for the response
+                log.debug("Reading data ....");
+                try {
+                    if (null != initialByte && START_OF_BLOCK == initialByte) {
+                        hl7MessageBytes = MllpUtil.closeFrame(clientSocket);
+                    } else {
+                        try {
+                            MllpUtil.openFrame(clientSocket);
+                        } catch (SocketTimeoutException timeoutEx) {
+                            // When thrown by openFrame, it indicates that no data was available - but no error
+                            continue;
+                        }
+                        hl7MessageBytes = MllpUtil.closeFrame(clientSocket);
+                    }
+                } catch (MllpException mllpEx) {
+                    Exchange exchange = endpoint.createExchange(ExchangePattern.InOut);
+                    exchange.setException(mllpEx);
+                    return;
+                } finally {
+                    initialByte = null;
+                }
+
+                if (null == hl7MessageBytes) {
+                    continue;
+                }
+
+                log.debug("Populating the exchange with received message");
+                Exchange exchange = endpoint.createExchange(ExchangePattern.InOut);
+                Message message = exchange.getIn();
+                message.setBody(hl7MessageBytes, byte[].class);
+
+                message.setHeader(MLLP_LOCAL_ADDRESS, clientSocket.getLocalAddress().toString());
+                message.setHeader(MLLP_REMOTE_ADDRESS, clientSocket.getRemoteSocketAddress());
+
+                populateHl7DataHeaders(exchange, message, hl7MessageBytes);
+
+
+                log.debug("Calling processor");
+                try {
+                    getProcessor().process(exchange);
+                    // processed the message - send the acknowledgement
+
+                    // Check BEFORE_SEND Properties
+                    if (exchange.getProperty(MLLP_RESET_CONNECTION_BEFORE_SEND, boolean.class)) {
+                        MllpUtil.resetConnection(clientSocket);
+                        return;
+                    } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_BEFORE_SEND, boolean.class)) {
+                        MllpUtil.closeConnection(clientSocket);
+                    }
+
+                    // Find the acknowledgement body
+                    byte[] acknowledgementMessageBytes = exchange.getProperty(MLLP_ACKNOWLEDGEMENT, byte[].class);
+                    String acknowledgementMessageType = null;
+                    if (null == acknowledgementMessageBytes) {
+                        if (!endpoint.autoAck) {
+                            exchange.setException(new MllpInvalidAcknowledgementException("Automatic Acknowledgement is disabled and the "
+                                    + MLLP_ACKNOWLEDGEMENT + " exchange property is null or cannot be converted to byte[]"));
+                            return;
+                        }
+
+                        String acknowledgmentTypeProperty = exchange.getProperty(MLLP_ACKNOWLEDGEMENT_TYPE, String.class);
+                        try {
+                            if (null == acknowledgmentTypeProperty) {
+                                if (null == exchange.getException()) {
+                                    acknowledgementMessageType = "AA";
+                                    acknowledgementMessageBytes = acknowledgementGenerator.generateApplicationAcceptAcknowledgementMessage(hl7MessageBytes);
+                                } else {
+                                    acknowledgementMessageType = "AE";
+                                    acknowledgementMessageBytes = acknowledgementGenerator.generateApplicationErrorAcknowledgementMessage(hl7MessageBytes);
+                                }
+                            } else {
+                                switch (acknowledgmentTypeProperty) {
+                                case "AA":
+                                    acknowledgementMessageType = "AA";
+                                    acknowledgementMessageBytes = acknowledgementGenerator.generateApplicationAcceptAcknowledgementMessage(hl7MessageBytes);
+                                    break;
+                                case "AE":
+                                    acknowledgementMessageType = "AE";
+                                    acknowledgementMessageBytes = acknowledgementGenerator.generateApplicationErrorAcknowledgementMessage(hl7MessageBytes);
+                                    break;
+                                case "AR":
+                                    acknowledgementMessageType = "AR";
+                                    acknowledgementMessageBytes = acknowledgementGenerator.generateApplicationRejectAcknowledgementMessage(hl7MessageBytes);
+                                    break;
+                                default:
+                                    exchange.setException(new Hl7AcknowledgementGenerationException("Unsupported acknowledgment type: " + acknowledgmentTypeProperty));
+                                    return;
+                                }
+                            }
+                        } catch (Hl7AcknowledgementGenerationException ackGenerationException) {
+                            exchange.setException(ackGenerationException);
+                        }
+                    } else {
+                        final byte bM = 77;
+                        final byte bS = 83;
+                        final byte bA = 65;
+                        final byte bE = 69;
+                        final byte bR = 82;
+
+                        final byte fieldSeparator = hl7MessageBytes[3];
+                        // Acknowledgment is specified in exchange property - determine the acknowledgement type
+                        for (int i = 0; i < hl7MessageBytes.length; ++i) {
+                            if (SEGMENT_DELIMITER == i) {
+                                if (i + 7 < hl7MessageBytes.length // Make sure we don't run off the end of the message
+                                        && bM == hl7MessageBytes[i + 1] && bS == hl7MessageBytes[i + 2] && bA == hl7MessageBytes[i + 3] && fieldSeparator == hl7MessageBytes[i + 4]) {
+                                    if (fieldSeparator != hl7MessageBytes[i + 7]) {
+                                        log.warn("MSA-1 is longer than 2-bytes - ignoring trailing bytes");
+                                    }
+                                    // Found MSA - pull acknowledgement bytes
+                                    byte[] acknowledgmentTypeBytes = new byte[2];
+                                    acknowledgmentTypeBytes[0] = hl7MessageBytes[i + 5];
+                                    acknowledgmentTypeBytes[1] = hl7MessageBytes[i + 6];
+                                    acknowledgementMessageType = IOConverter.toString(acknowledgmentTypeBytes, exchange);
+
+                                    // Verify it's a valid acknowledgement code
+                                    if (bA != acknowledgmentTypeBytes[0]) {
+                                        switch (acknowledgementMessageBytes[1]) {
+                                        case bA:
+                                        case bR:
+                                        case bE:
+                                            break;
+                                        default:
+                                            log.warn("Invalid acknowledgement type [" + acknowledgementMessageType + "] found in message - should be AA, AE or AR");
+                                        }
+                                    }
+
+                                    // if the MLLP_ACKNOWLEDGEMENT_TYPE property is set on the exchange, make sure it matches
+                                    String acknowledgementTypeProperty = exchange.getProperty(MLLP_ACKNOWLEDGEMENT_TYPE, String.class);
+                                    if (null != acknowledgementTypeProperty && !acknowledgementTypeProperty.equals(acknowledgementMessageType)) {
+                                        log.warn("Acknowledgement type found in message [" + acknowledgementMessageType + "] does not match "
+                                                + MLLP_ACKNOWLEDGEMENT_TYPE + " exchange property value [" + acknowledgementTypeProperty + "] - using value found in message");
+                                    }
+                                }
+                            }
+                        }
+                    }
+
+                    // Send the acknowledgement
+                    log.debug("Writing Acknowledgement");
+                    MllpUtil.writeFramedPayload(clientSocket, acknowledgementMessageBytes);
+                    exchange.getIn().setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementMessageBytes);
+                    exchange.getIn().setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, acknowledgementMessageType);
+
+                    // Check AFTER_SEND Properties
+                    if (exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class)) {
+                        MllpUtil.resetConnection(clientSocket);
+                        return;
+                    } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, boolean.class)) {
+                        MllpUtil.closeConnection(clientSocket);
+                    }
+
+                } catch (Exception e) {
+                    exchange.setException(e);
+                }
+
+            }
+
+            log.info("ClientSocketThread exiting");
+
+        }
+
+        private void populateHl7DataHeaders(Exchange exchange, Message message, byte[] hl7MessageBytes) {
+            // Find the end of the MSH and indexes of the fields in the MSH to populate message headers
+            final byte fieldSeparator = hl7MessageBytes[3];
+            final byte componentSeparator = hl7MessageBytes[4];
+            int endOfMSH = -1;
+            List<Integer> fieldSeparatorIndexes = new ArrayList<>(10);  // We need at least 10 fields to create the acknowledgment
+
+            for (int i = 0; i < hl7MessageBytes.length; ++i) {
+                if (fieldSeparator == hl7MessageBytes[i]) {
+                    fieldSeparatorIndexes.add(i);
+                } else if (SEGMENT_DELIMITER == hl7MessageBytes[i]) {
+                    endOfMSH = i;
+                    break;
+                }
+            }
+
+            if (-1 == endOfMSH) {
+                // TODO:  May want to throw some sort of an Exception here
+                log.error("Population of message headers failed - unable to find the end of the MSH segment");
+            } else {
+                log.debug("Populating the message headers");
+                Charset charset = Charset.forName(IOHelper.getCharsetName(exchange));
+
+                // MSH-3
+                message.setHeader(MLLP_SENDING_APPLICATION, new String(hl7MessageBytes, fieldSeparatorIndexes.get(1) + 1,
+                        fieldSeparatorIndexes.get(2) - fieldSeparatorIndexes.get(1) - 1, charset));
+                // MSH-4
+                message.setHeader(MLLP_SENDING_FACILITY, new String(hl7MessageBytes, fieldSeparatorIndexes.get(2) + 1,
+                        fieldSeparatorIndexes.get(3) - fieldSeparatorIndexes.get(2) - 1, charset));
+                // MSH-5
+                message.setHeader(MLLP_RECEIVING_APPLICATION, new String(hl7MessageBytes, fieldSeparatorIndexes.get(3) + 1,
+                        fieldSeparatorIndexes.get(4) - fieldSeparatorIndexes.get(3) - 1,
+                        charset));
+                // MSH-6
+                message.setHeader(MLLP_RECEIVING_FACILITY, new String(hl7MessageBytes, fieldSeparatorIndexes.get(4) + 1,
+                        fieldSeparatorIndexes.get(5) - fieldSeparatorIndexes.get(4) - 1,
+                        charset));
+                // MSH-7
+                message.setHeader(MLLP_TIMESTAMP, new String(hl7MessageBytes, fieldSeparatorIndexes.get(5) + 1,
+                        fieldSeparatorIndexes.get(6) - fieldSeparatorIndexes.get(5) - 1, charset));
+                // MSH-8
+                message.setHeader(MLLP_SECURITY, new String(hl7MessageBytes, fieldSeparatorIndexes.get(6) + 1,
+                        fieldSeparatorIndexes.get(7) - fieldSeparatorIndexes.get(6) - 1, charset));
+                // MSH-9
+                message.setHeader(MLLP_MESSAGE_TYPE, new String(hl7MessageBytes, fieldSeparatorIndexes.get(7) + 1,
+                        fieldSeparatorIndexes.get(8) - fieldSeparatorIndexes.get(7) - 1, charset));
+                // MSH-10
+                message.setHeader(MLLP_MESSAGE_CONTROL, new String(hl7MessageBytes, fieldSeparatorIndexes.get(8) + 1,
+                        fieldSeparatorIndexes.get(9) - fieldSeparatorIndexes.get(8) - 1, charset));
+                // MSH-11
+                message.setHeader(MLLP_PROCESSING_ID, new String(hl7MessageBytes, fieldSeparatorIndexes.get(9) + 1,
+                        fieldSeparatorIndexes.get(10) - fieldSeparatorIndexes.get(9) - 1, charset));
+                // MSH-12
+                message.setHeader(MLLP_VERSION_ID, new String(hl7MessageBytes, fieldSeparatorIndexes.get(10) + 1,
+                        fieldSeparatorIndexes.get(11) - fieldSeparatorIndexes.get(10) - 1, charset));
+                // MSH-18
+                message.setHeader(MLLP_CHARSET, new String(hl7MessageBytes, fieldSeparatorIndexes.get(16) + 1,
+                        fieldSeparatorIndexes.get(17) - fieldSeparatorIndexes.get(16) - 1, charset));
+
+                for (int i = fieldSeparatorIndexes.get(7) + 1; i < fieldSeparatorIndexes.get(8); ++i) {
+                    if (componentSeparator == hl7MessageBytes[i]) {
+                        // MSH-9.1
+                        message.setHeader(MLLP_EVENT_TYPE, new String(hl7MessageBytes, fieldSeparatorIndexes.get(7) + 1,
+                                i - fieldSeparatorIndexes.get(7) - 1, charset));
+                        // MSH-9.2
+                        message.setHeader(MLLP_TRIGGER_EVENT, new String(hl7MessageBytes, i + 1,
+                                fieldSeparatorIndexes.get(8) - i - 1, charset));
+                        break;
+                    }
+                }
+            }
+        }
+
+
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTimeoutException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTimeoutException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTimeoutException.java
new file mode 100644
index 0000000..0e54772
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTimeoutException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Raised when a MLLP Producer or Consumer encounter a timeout reading a message
+ */
+public class MllpTimeoutException extends MllpException {
+    public MllpTimeoutException(String message) {
+        super(message);
+    }
+
+    public MllpTimeoutException(String message, byte[] mllpPayload) {
+        super(message, mllpPayload);
+    }
+
+    public MllpTimeoutException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public MllpTimeoutException(String message, byte[] mllpPayload, Throwable cause) {
+        super(message, mllpPayload, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpWriteException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpWriteException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpWriteException.java
new file mode 100644
index 0000000..43fe740
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpWriteException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Raised when a MLLP Producer or consumer encounter an error transmitting data
+ */
+public class MllpWriteException extends MllpException {
+    public MllpWriteException(String message) {
+        super(message);
+    }
+
+    public MllpWriteException(String message, byte[] mllpPayload) {
+        super(message, mllpPayload);
+    }
+
+    public MllpWriteException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public MllpWriteException(String message, byte[] mllpPayload, Throwable cause) {
+        super(message, mllpPayload, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java
new file mode 100644
index 0000000..4fd3fbf
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp.impl;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+
+import org.apache.camel.component.mllp.MllpComponent;
+import org.apache.camel.component.mllp.MllpCorruptFrameException;
+import org.apache.camel.component.mllp.MllpException;
+import org.apache.camel.component.mllp.MllpTimeoutException;
+import org.apache.camel.component.mllp.MllpWriteException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_BLOCK;
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_DATA;
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_STREAM;
+import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK;
+
+
+/**
+ * Supplies methods to read and write messages in a MLLP Frame.
+ * <p/>
+ * Although the methods in the class are intended to handle HL7 v2 formatted messages, the methods do not
+ * depend on that format - any byte[]can be written to the Socket.  Also, any byte[] can be read from the socket
+ * provided it has the proper MLLP Enveloping - <START_OF_BLOCK>payload<END_OF_BLOCK><END_OF_DATA>>.
+ * <p/>
+ * NOTE: MLLP payloads are not logged unless the logging level is set to DEBUG or TRACE to avoid introducing PHI
+ * into the log files.  Logging of PHI can be globally disabled by setting the org.apache.camel.mllp.logPHI system
+ * property.  The property is evaluated using Boolean.parseBoolean.
+ * <p/>
+ */
+public final class MllpUtil {
+    static Logger log = LoggerFactory.getLogger(MllpUtil.class);
+
+    private MllpUtil() {
+    }
+
+    /**
+     * Open the MLLP frame by reading from the Socket until the begging of the frame is found.
+     * <p/>
+     * If any errors occur (including MLLP frame errors) while opening the frame, the socket will be closed and an
+     * Exception will be thrown.
+     *
+     * @param socket the Socket to read
+     * @throws SocketTimeoutException    thrown if a timeout occurs while looking for the beginning of the MLLP frame, but
+     *                                   nothing is yet available - this is NOT an error condition
+     * @throws MllpCorruptFrameException if the MLLP Frame is corrupted in some way
+     * @throws MllpException             for other unexpected error conditions
+     */
+    public static void openFrame(Socket socket) throws SocketTimeoutException, MllpCorruptFrameException, MllpException {
+        if (socket.isConnected() && !socket.isClosed()) {
+            InputStream socketInputStream = MllpUtil.getInputStream(socket);
+
+            int readByte;
+            try {
+                readByte = socketInputStream.read();
+                switch (readByte) {
+                case START_OF_BLOCK:
+                    return;
+                case END_OF_STREAM:
+                    resetConnection(socket);
+                    return;
+                default:
+                    // Continue on and process the out-of-frame data
+                }
+            } catch (SocketTimeoutException normaTimeoutEx) {
+                // Just pass this on - the caller will wrap it in a MllpTimeoutException
+                throw normaTimeoutEx;
+            } catch (IOException unexpectedException) {
+                log.error("Unexpected Exception occurred opening MLLP frame - resetting the connection");
+                MllpUtil.resetConnection(socket);
+                throw new MllpException("Unexpected Exception occurred opening MLLP frame", unexpectedException);
+            }
+
+            /*
+             From here on, we're in a bad frame state.  Read what's left in the socket, close the connection and
+             return the out-of-frame data.
+              */
+            ByteArrayOutputStream outOfFrameData = new ByteArrayOutputStream();
+            outOfFrameData.write(readByte);
+
+            try {
+                while (true) {
+                    readByte = socketInputStream.read();
+                    switch (readByte) {
+                    case END_OF_STREAM:
+                        if (isLogPHIEnabled(log)) {
+                            log.error("END_OF_STREAM read while looking for the beginning of the MLLP frame, and "
+                                            + "out-of-frame data had been read - resetting connection and eating out-of-frame data: {}",
+                                    outOfFrameData.toString().replace('\r', '\n'));
+                        } else {
+                            log.error("END_OF_STREAM read while looking for the beginning of the MLLP frame, and out-of-frame data had been read - resetting connection and eating out-of-frame data");
+                        }
+                        resetConnection(socket);
+
+                        throw new MllpCorruptFrameException("END_OF_STREAM read while looking for the beginning of the MLLP frame", outOfFrameData.toByteArray());
+                    case START_OF_BLOCK:
+                        if (isLogPHIEnabled(log)) {
+                            log.warn("The beginning of the MLLP frame was preceded by out-of-frame data - eating data: {}", outOfFrameData.toString().replace('\r', '\n'));
+                        } else {
+                            log.warn("The beginning of the MLLP frame was preceded by out-of-frame data - eating data");
+                        }
+
+                        throw new MllpCorruptFrameException("The beginning of the MLLP frame was preceded by out-of-frame data", outOfFrameData.toByteArray());
+                    default:
+                        // still reading out-of-frame data
+                        outOfFrameData.write(readByte);
+                        break;
+                    }
+                }
+            } catch (SocketTimeoutException timeoutEx) {
+                if (isLogPHIEnabled(log)) {
+                    log.error("Timeout looking for the beginning of the MLLP frame, and out-of-frame data had been read - resetting connection and eating out-of-frame data: {}",
+                            outOfFrameData.toString().replace('\r', '\n'));
+                } else {
+                    log.error("Timeout looking for the beginning of the MLLP frame, and out-of-frame data had been read - resetting connection and eating out-of-frame data");
+                }
+
+                resetConnection(socket);
+
+                throw new MllpCorruptFrameException("Timeout looking for the beginning of the MLLP frame, and out-of-frame data had been read", outOfFrameData.toByteArray());
+            } catch (IOException e) {
+                if (isLogPHIEnabled(log)) {
+                    log.error("Exception encountered looking for the beginning of the MLLP frame, and out-of-frame data had been read - resetting connection and eating out-of-frame data: {}",
+                            outOfFrameData.toString().replace('\r', '\n'));
+                } else {
+                    log.error("Exception encountered looking for the beginning of the MLLP frame, and out-of-frame data had been read - resetting connection and eating out-of-frame data");
+                }
+
+                resetConnection(socket);
+
+                throw new MllpCorruptFrameException("Exception encountered looking for the beginning of the MLLP frame, and out-of-frame data had been read", outOfFrameData.toByteArray());
+            }
+        }
+    }
+
+    /**
+     * Close a MLLP frame by reading from the socket until the end of the frame is found.
+     * <p/>
+     * The method assumes the MLLP frame has already been opened and the first byte available
+     * will be the first byte of the framed message.
+     * <p/>
+     * The method consumes the END_OF_BLOCK and END_OF_DATA bytes from the stream before returning the payload
+     * <p/>
+     * If any errors occur (including MLLP frame errors) while opening the frame, the socket will be closed and an
+     * Exception will be thrown.
+     *
+     * @param socket the Socket to be read
+     * @return the payload of the MLLP-Enveloped message as a byte[]
+     * @throws MllpTimeoutException      thrown if a timeout occurs while closing the MLLP frame
+     * @throws MllpCorruptFrameException if the MLLP Frame is corrupted in some way
+     * @throws MllpException             for other unexpected error conditions
+     */
+    public static byte[] closeFrame(Socket socket) throws MllpTimeoutException, MllpCorruptFrameException, MllpException {
+        if (socket.isConnected() && !socket.isClosed()) {
+            InputStream socketInputStream = MllpUtil.getInputStream(socket);
+            // TODO:  Come up with an intelligent way to size this stream
+            ByteArrayOutputStream payload = new ByteArrayOutputStream(4096);
+            try {
+                while (true) {
+                    int readByte = socketInputStream.read();
+                    switch (readByte) {
+                    case END_OF_STREAM:
+                        if (isLogPHIEnabled(log)) {
+                            log.error("END_OF_STREAM read while looking for the end of the MLLP frame - resetting connection and eating data: {}", payload.toString().replace('\r', '\n'));
+                        } else {
+                            log.error("END_OF_STREAM read while looking for the end of the MLLP frame - resetting connection and eating data");
+                        }
+
+                        resetConnection(socket);
+
+                        throw new MllpCorruptFrameException("END_OF_STREAM read while looking for the end of the MLLP frame", payload.size() > 0 ? payload.toByteArray() : null);
+                    case START_OF_BLOCK:
+                        if (isLogPHIEnabled(log)) {
+                            log.error("A new MLLP frame was opened before the previous frame was closed - resetting connection and eating data: {}", payload.toString().replace('\r', '\n'));
+                        } else {
+                            log.error("A new MLLP frame was opened before the previous frame was closed - resetting connection and eating data");
+                        }
+
+                        resetConnection(socket);
+
+                        throw new MllpCorruptFrameException("A new MLLP frame was opened before the previous frame was closed", payload.size() > 0 ? payload.toByteArray() : null);
+                    case END_OF_BLOCK:
+                        if (END_OF_DATA != socketInputStream.read()) {
+                            if (isLogPHIEnabled(log)) {
+                                log.error("The MLLP frame was partially closed - END_OF_BLOCK was not followed by END_OF_DATA - resetting connection and eating data: {}",
+                                        payload.toString().replace('\r', '\n'));
+                            } else {
+                                log.error("The MLLP frame was partially closed - END_OF_BLOCK was not followed by END_OF_DATA - resetting connection and eating data");
+                            }
+
+                            resetConnection(socket);
+
+                            throw new MllpCorruptFrameException("The MLLP frame was partially closed - END_OF_BLOCK was not followed by END_OF_DATA",
+                                    payload.size() > 0 ? payload.toByteArray() : null);
+                        }
+                        return payload.toByteArray();
+                    default:
+                        // log.trace( "Read Character: {}", (char)readByte );
+                        payload.write(readByte);
+                    }
+                }
+            } catch (SocketTimeoutException timeoutEx) {
+                if (0 < payload.size()) {
+                    if (isLogPHIEnabled(log)) {
+                        log.error("Timeout looking for the end of the MLLP frame - resetting connection and eating data: {}", payload.toString().replace('\r', '\n'));
+                    } else {
+                        log.error("Timeout looking for the end of the MLLP frame - resetting connection and eating data");
+                    }
+                } else {
+                    log.error("Timeout looking for the end of the MLLP frame - resetting connection");
+                }
+
+                resetConnection(socket);
+
+                throw new MllpCorruptFrameException("Timeout looking for the end of the MLLP frame", payload.size() > 0 ? payload.toByteArray() : null, timeoutEx);
+            } catch (IOException ioEx) {
+                if (0 < payload.size()) {
+                    if (isLogPHIEnabled(log)) {
+                        log.error("Exception encountered looking for the end of the MLLP frame - resetting connection and eating data: {}", payload.toString().replace('\r', '\n'));
+                    } else {
+                        log.error("Exception encountered looking for the end of the MLLP frame - resetting connection and eating data");
+                    }
+                } else {
+                    log.error("Exception encountered looking for the end of the MLLP frame - resetting connection");
+                }
+
+                resetConnection(socket);
+
+                throw new MllpException("Exception encountered looking for the end of the MLLP frame", payload.size() > 0 ? payload.toByteArray() : null, ioEx);
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * Write a MLLP-Framed payload to the Socket
+     *
+     * @param socket  the Socket to write the payload
+     * @param payload the MLLP payload
+     * @return true if write was successful; false otherwise
+     * @throws MllpWriteException if the write fails
+     * @throws MllpException      for other unexpected error conditions
+     */
+    public static void writeFramedPayload(Socket socket, byte[] payload) throws MllpException {
+        if (socket.isConnected() && !socket.isClosed()) {
+            OutputStream outputStream = null;
+            try {
+                outputStream = new BufferedOutputStream(socket.getOutputStream(), payload.length + 4);
+            } catch (IOException ioEx) {
+                log.error("Error Retrieving OutputStream from Socket - resetting connection");
+
+                resetConnection(socket);
+
+                throw new MllpException("Error Retrieving OutputStream from Socket", ioEx);
+            }
+
+            if (null != outputStream) {
+                try {
+                    outputStream.write(START_OF_BLOCK);
+                    outputStream.write(payload, 0, payload.length);
+                    outputStream.write(END_OF_BLOCK);
+                    outputStream.write(END_OF_DATA);
+                    outputStream.flush();
+                } catch (IOException ioEx) {
+                    log.error("Error writing MLLP payload - resetting connection");
+
+                    resetConnection(socket);
+
+                    throw new MllpWriteException("Error writing MLLP payload", payload, ioEx);
+                }
+            }
+        }
+    }
+
+    public static void closeConnection(Socket socket) {
+        if (null != socket) {
+            if (!socket.isClosed()) {
+                try {
+                    socket.shutdownInput();
+                } catch (Exception ex) {
+                    log.warn("Exception encountered shutting down the input stream on the client socket", ex);
+                }
+
+                try {
+                    socket.shutdownOutput();
+                } catch (Exception ex) {
+                    log.warn("Exception encountered shutting down the output stream on the client socket", ex);
+                }
+
+                try {
+                    socket.close();
+                } catch (Exception ex) {
+                    log.warn("Exception encountered closing the client socket", ex);
+                }
+            }
+        }
+    }
+
+    public static void resetConnection(Socket socket) {
+        if (null != socket) {
+            try {
+                socket.setSoLinger(true, 0);
+            } catch (Exception ex) {
+                log.warn("Exception encountered setting SO_LINGER to 0 on the socket to force a reset", ex);
+            }
+
+            try {
+                socket.close();
+            } catch (Exception ex) {
+                log.warn("Exception encountered closing the client socket", ex);
+            }
+
+        }
+
+    }
+
+    /**
+     * Retrieve the InputStream from the Socket
+     * <p/>
+     * Private utility method that catches IOExceptions when retrieving the InputStream
+     *
+     * @param socket Socket to get the InputStream from
+     * @return the InputStream for the Socket
+     * @throws MllpException when unexpected conditions occur
+     */
+    private static InputStream getInputStream(Socket socket) throws MllpException {
+        InputStream socketInputStream = null;
+        try {
+            socketInputStream = socket.getInputStream();
+        } catch (IOException ioEx) {
+            throw new MllpException("Error Retrieving InputStream from Socket", ioEx);
+        }
+
+        return socketInputStream;
+    }
+
+    private static boolean isLogPHIEnabled(Logger targetLogger) {
+        String logPHIProperty = System.getProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "true");
+        if (targetLogger.isDebugEnabled()) {
+            if (Boolean.parseBoolean(System.getProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "true"))) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerationException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerationException.java b/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerationException.java
new file mode 100644
index 0000000..c27c7f9
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerationException.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.mllp;
+
+public class Hl7AcknowledgementGenerationException extends Exception {
+    private final byte[] hl7Message;
+
+    public Hl7AcknowledgementGenerationException(String message) {
+        super(message);
+        this.hl7Message = null;
+    }
+
+    public Hl7AcknowledgementGenerationException(String message, byte[] hl7Message) {
+        super(message);
+        this.hl7Message = hl7Message;
+    }
+
+    public Hl7AcknowledgementGenerationException(String message, byte[] hl7Message, Throwable cause) {
+        super(message, cause);
+        this.hl7Message = hl7Message;
+    }
+
+
+    public byte[] getHl7Message() {
+        return hl7Message;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerator.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerator.java b/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerator.java
new file mode 100644
index 0000000..b95264a
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerator.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.mllp;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT;
+import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE;
+import static org.apache.camel.component.mllp.MllpEndpoint.MESSAGE_TERMINATOR;
+import static org.apache.camel.component.mllp.MllpEndpoint.SEGMENT_DELIMITER;
+
+/**
+ * A Camel Processor for generating HL7 Acknowledgements
+ */
+public class Hl7AcknowledgementGenerator implements Processor {
+    Logger log = LoggerFactory.getLogger(this.getClass());
+
+    String defaultNack = "MSH|^~\\&|||||||NACK||P|2.2" + SEGMENT_DELIMITER
+            + "MSA|AR|" + SEGMENT_DELIMITER
+            + MESSAGE_TERMINATOR;
+
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        Message message = null;
+        if (exchange.hasOut()) {
+            message = exchange.getOut();
+        } else {
+            message = exchange.getIn();
+        }
+
+        byte[] hl7Bytes = message.getMandatoryBody(byte[].class);
+
+        byte[] acknowledgementBytes = null;
+        if (null == exchange.getException()) {
+            acknowledgementBytes = generateApplicationAcceptAcknowledgementMessage(hl7Bytes);
+            message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
+        } else {
+            acknowledgementBytes = generateApplicationErrorAcknowledgementMessage(hl7Bytes);
+            message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AE");
+        }
+
+        message.setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementBytes);
+    }
+
+    public byte[] generateApplicationAcceptAcknowledgementMessage(byte[] hl7MessageBytes) throws Hl7AcknowledgementGenerationException {
+        return generateAcknowledgementMessage(hl7MessageBytes, "AA");
+    }
+
+    public byte[] generateApplicationRejectAcknowledgementMessage(byte[] hl7MessageBytes) throws Hl7AcknowledgementGenerationException {
+        return generateAcknowledgementMessage(hl7MessageBytes, "AR");
+    }
+
+    public byte[] generateApplicationErrorAcknowledgementMessage(byte[] hl7MessageBytes) throws Hl7AcknowledgementGenerationException {
+        return generateAcknowledgementMessage(hl7MessageBytes, "AE");
+    }
+
+    byte[] generateAcknowledgementMessage(byte[] hl7MessageBytes, String acknowledgementCode) throws Hl7AcknowledgementGenerationException {
+        if (hl7MessageBytes == null) {
+            throw new Hl7AcknowledgementGenerationException("Null HL7 message received for parsing operation");
+        }
+
+        final byte fieldSeparator = hl7MessageBytes[3];
+        final byte componentSeparator = hl7MessageBytes[4];
+
+        List<Integer> fieldSeparatorIndexes = new ArrayList<>(10);  // We need at least 10 fields to create the acknowledgment
+
+        // Find the end of the MSH and indexes of the fields in the MSH
+        int endOfMSH = -1;
+        for (int i = 0; i < hl7MessageBytes.length; ++i) {
+            if (fieldSeparator == hl7MessageBytes[i]) {
+                fieldSeparatorIndexes.add(i);
+            } else if (SEGMENT_DELIMITER == hl7MessageBytes[i]) {
+                endOfMSH = i;
+                break;
+            }
+        }
+
+        if (-1 == endOfMSH) {
+            throw new Hl7AcknowledgementGenerationException("Failed to find the end of the  MSH Segment while attempting to generate response", hl7MessageBytes);
+        }
+
+        if (8 > fieldSeparatorIndexes.size()) {
+            throw new Hl7AcknowledgementGenerationException("Insufficient number of fields in after MSH-2 in MSH to generate a response - 8 are required but "
+                    + fieldSeparatorIndexes.size() + " " + "were found", hl7MessageBytes);
+        }
+
+        // Build the MSH Segment
+        ByteArrayOutputStream acknowledgement = new ByteArrayOutputStream(1024);
+        acknowledgement.write(hl7MessageBytes, 0, 8); // through MSH-2 (without trailing field separator)
+        acknowledgement.write(hl7MessageBytes, fieldSeparatorIndexes.get(3), fieldSeparatorIndexes.get(4) - fieldSeparatorIndexes.get(3)); // MSH-5
+        acknowledgement.write(hl7MessageBytes, fieldSeparatorIndexes.get(4), fieldSeparatorIndexes.get(5) - fieldSeparatorIndexes.get(4)); // MSH-6
+        acknowledgement.write(hl7MessageBytes, fieldSeparatorIndexes.get(1), fieldSeparatorIndexes.get(2) - fieldSeparatorIndexes.get(1)); // MSH-3
+        acknowledgement.write(hl7MessageBytes, fieldSeparatorIndexes.get(2), fieldSeparatorIndexes.get(3) - fieldSeparatorIndexes.get(2)); // MSH-4
+        acknowledgement.write(hl7MessageBytes, fieldSeparatorIndexes.get(5), fieldSeparatorIndexes.get(7) - fieldSeparatorIndexes.get(5)); // MSH-7 and MSH-8
+        // Need to generate the correct MSH-9
+        acknowledgement.write(fieldSeparator);
+        acknowledgement.write("ACK".getBytes(), 0, 3); // MSH-9.1
+        int msh92start = -1;
+        for (int j = fieldSeparatorIndexes.get(7) + 1; j < fieldSeparatorIndexes.get(8); ++j) {
+            if (componentSeparator == hl7MessageBytes[j]) {
+                msh92start = j;
+                break;
+            }
+        }
+
+        if (-1 == msh92start) {
+            log.warn("Didn't find component separator for MSH-9.2 - sending ACK in MSH-9");
+        } else {
+            acknowledgement.write(hl7MessageBytes, msh92start, fieldSeparatorIndexes.get(8) - msh92start); // MSH-9.2
+        }
+
+        acknowledgement.write(hl7MessageBytes, fieldSeparatorIndexes.get(8), endOfMSH - fieldSeparatorIndexes.get(8)); // MSH-10 through the end of the MSH
+        acknowledgement.write(SEGMENT_DELIMITER);
+
+        // Build the MSA Segment
+        acknowledgement.write("MSA".getBytes(), 0, 3);
+        acknowledgement.write(fieldSeparator);
+        acknowledgement.write(acknowledgementCode.getBytes(), 0, 2);
+        acknowledgement.write(hl7MessageBytes, fieldSeparatorIndexes.get(8), fieldSeparatorIndexes.get(9) - fieldSeparatorIndexes.get(8)); // MSH-10 end
+        acknowledgement.write(SEGMENT_DELIMITER);
+
+        // Terminate the message
+        acknowledgement.write(MESSAGE_TERMINATOR);
+
+        return acknowledgement.toByteArray();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/resources/META-INF/services/org/apache/camel/component/mllp
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/resources/META-INF/services/org/apache/camel/component/mllp b/components/camel-mllp/src/main/resources/META-INF/services/org/apache/camel/component/mllp
new file mode 100644
index 0000000..3aedcac
--- /dev/null
+++ b/components/camel-mllp/src/main/resources/META-INF/services/org/apache/camel/component/mllp
@@ -0,0 +1 @@
+class=org.apache.camel.component.mllp.MllpComponent

http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpProducerConsumerLoopbackTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpProducerConsumerLoopbackTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpProducerConsumerLoopbackTest.java
new file mode 100644
index 0000000..027de7c
--- /dev/null
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpProducerConsumerLoopbackTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.test.mllp.PassthroughProcessor;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.camel.test.mllp.Hl7MessageGenerator.generateMessage;
+
+
+public class MllpProducerConsumerLoopbackTest extends CamelTestSupport {
+    int mllpPort = AvailablePortFinder.getNextAvailable();
+
+    @EndpointInject(uri = "direct://source")
+    ProducerTemplate source;
+
+    @EndpointInject(uri = "mock://acknowledged")
+    MockEndpoint acknowledged;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        DefaultCamelContext context = (DefaultCamelContext) super.createCamelContext();
+
+        context.setUseMDCLogging(true);
+        context.setName(this.getClass().getSimpleName());
+
+        return context;
+    }
+
+
+    @Override
+    protected RouteBuilder[] createRouteBuilders() throws Exception {
+        RouteBuilder[] builders = new RouteBuilder[2];
+        final int groupInterval = 1000;
+        final boolean groupActiveOnly = false;
+
+        builders[0] = new RouteBuilder() {
+            String routeId = "mllp-receiver";
+
+            public void configure() {
+                fromF("mllp:%d?autoAck=true", mllpPort)
+                        .convertBodyTo(String.class)
+                        .to(acknowledged)
+                        .process(new PassthroughProcessor("after send to result"))
+                        .log(LoggingLevel.DEBUG, routeId, "Receiving: ${body}")
+                        .toF("log://%s?level=INFO&groupInterval=%d&groupActiveOnly=%b", routeId, groupInterval, groupActiveOnly);
+            }
+        };
+
+        builders[1] = new RouteBuilder() {
+            String routeId = "mllp-sender";
+
+            String host = "0.0.0.0";
+
+            public void configure() {
+                from(source.getDefaultEndpoint()).routeId(routeId)
+                        .log(LoggingLevel.DEBUG, routeId, "Sending: ${body}")
+                        .toF("mllp://%s:%d", host, mllpPort)
+                        .setBody(header(MllpConstants.MLLP_ACKNOWLEDGEMENT))
+                        .toF("log://%s?level=INFO&groupInterval=%d&groupActiveOnly=%b", routeId, groupInterval, groupActiveOnly);
+            }
+        };
+
+        return builders;
+    }
+
+    @Test
+    public void testLoopbackWithOneMessage() throws Exception {
+        String testMessage = generateMessage();
+        acknowledged.expectedBodiesReceived(testMessage);
+
+        String acknowledgement = source.requestBody((Object) testMessage, String.class);
+        Assert.assertThat("Should be acknowledgment for message 1", acknowledgement, CoreMatchers.containsString(String.format("MSA|AA|00001")));
+
+        assertMockEndpointsSatisfied(60, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testLoopbackMultipleMessages() throws Exception {
+        int messageCount = 1000;
+        acknowledged.expectedMessageCount(messageCount);
+
+        for (int i = 1; i <= messageCount; ++i) {
+            String testMessage = generateMessage(i);
+            acknowledged.message(i - 1).body().isEqualTo(testMessage);
+            String acknowledgement = source.requestBody((Object) testMessage, String.class);
+            Assert.assertThat("Should be acknowledgment for message " + i, acknowledgement, CoreMatchers.containsString(String.format("MSA|AA|%05d", i)));
+
+        }
+
+        assertMockEndpointsSatisfied(60, TimeUnit.SECONDS);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientConsumerBlueprintTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientConsumerBlueprintTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientConsumerBlueprintTest.java
new file mode 100644
index 0000000..e4ca285
--- /dev/null
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientConsumerBlueprintTest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.blueprint.CamelBlueprintTestSupport;
+import org.junit.Ignore;
+
+@Ignore(value = "Not Yet Implemented")
+// TODO: Implement this
+public class MllpTcpClientConsumerBlueprintTest extends CamelBlueprintTestSupport {
+    @EndpointInject(uri = "mock://target")
+    MockEndpoint target;
+
+    @Override
+    protected String getBlueprintDescriptor() {
+        return "OSGI-INF/blueprint/mllp-tcp-client-consumer.xml";
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerAcknowledgementTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerAcknowledgementTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerAcknowledgementTest.java
new file mode 100644
index 0000000..af41550
--- /dev/null
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerAcknowledgementTest.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit.rule.mllp.MllpServerResource;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.apache.camel.test.mllp.Hl7MessageGenerator.generateMessage;
+
+
+public class MllpTcpClientProducerAcknowledgementTest extends CamelTestSupport {
+    @Rule
+    public MllpServerResource mllpServer = new MllpServerResource(AvailablePortFinder.getNextAvailable());
+
+    @EndpointInject(uri = "direct://source")
+    ProducerTemplate source;
+
+    @EndpointInject(uri = "mock://complete")
+    MockEndpoint complete;
+
+    @EndpointInject(uri = "mock://aa-ack")
+    MockEndpoint accept;
+    @EndpointInject(uri = "mock://ae-nack")
+    MockEndpoint error;
+    @EndpointInject(uri = "mock://ar-nack")
+    MockEndpoint reject;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        DefaultCamelContext context = (DefaultCamelContext) super.createCamelContext();
+
+        context.setUseMDCLogging(true);
+        context.setName(this.getClass().getSimpleName());
+
+        return context;
+    }
+
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            String routeId = "mllp-sender";
+
+            String host = "0.0.0.0";
+            int port = mllpServer.getListenPort();
+
+            public void configure() {
+                onException(MllpApplicationRejectAcknowledgementException.class)
+                        .handled(true)
+                        .to(reject)
+                        .log(LoggingLevel.ERROR, routeId, "AR Acknowledgemnet");
+
+                onException(MllpApplicationErrorAcknowledgementException.class)
+                        .handled(true)
+                        .to(error)
+                        .log(LoggingLevel.ERROR, routeId, "AE Acknowledgement");
+
+                onCompletion()
+                        .onCompleteOnly()
+                        .to(complete)
+                        .log(LoggingLevel.DEBUG, routeId, "AA Acknowledgement");
+
+                from(source.getDefaultEndpoint()).routeId(routeId)
+                        .log(LoggingLevel.INFO, routeId, "Sending Message")
+                        .toF("mllp://%s:%d", host, port)
+                        .log(LoggingLevel.INFO, routeId, "Received Acknowledgement")
+                        .to(accept);
+            }
+        };
+    }
+
+    @Test
+    public void testApplicationAcceptAcknowledgement() throws Exception {
+        complete.setExpectedMessageCount(1);
+        accept.setExpectedMessageCount(1);
+        reject.setExpectedMessageCount(0);
+        error.setExpectedMessageCount(0);
+
+        source.sendBody(generateMessage());
+
+        assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testApplicationRejectAcknowledgement() throws Exception {
+        complete.setExpectedMessageCount(1);
+        accept.setExpectedMessageCount(0);
+        reject.setExpectedMessageCount(1);
+        error.setExpectedMessageCount(0);
+
+        mllpServer.setSendApplicationRejectAcknowledgementModulus(1);
+
+        source.sendBody(generateMessage());
+
+        assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testApplicationErrorAcknowledgement() throws Exception {
+        complete.setExpectedMessageCount(1);
+        accept.setExpectedMessageCount(0);
+        reject.setExpectedMessageCount(0);
+        error.setExpectedMessageCount(1);
+
+        mllpServer.setSendApplicationErrorAcknowledgementModulus(1);
+
+        source.sendBody(generateMessage());
+
+        assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java
new file mode 100644
index 0000000..4233d80
--- /dev/null
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+import java.util.Dictionary;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultComponentResolver;
+import org.apache.camel.spi.ComponentResolver;
+import org.apache.camel.test.blueprint.CamelBlueprintTestSupport;
+import org.apache.camel.test.junit.rule.mllp.MllpServerResource;
+import org.apache.camel.util.KeyValueHolder;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.apache.camel.test.mllp.Hl7MessageGenerator.generateMessage;
+
+public class MllpTcpClientProducerBlueprintTest extends CamelBlueprintTestSupport {
+    @Rule
+    public MllpServerResource mllpServer = new MllpServerResource();
+
+    final String sourceUri = "direct://source";
+    final String mockAcknowledgedUri = "mock://acknowledged";
+    final String mockTimeoutUri = "mock://timeout-ex";
+    final String mockAeExUri = "mock://ae-ack";
+    final String mockArExUri = "mock://ar-ack";
+    final String mockFrameExUri = "mock://frame-ex";
+
+    @EndpointInject(uri = sourceUri)
+    ProducerTemplate source;
+    @EndpointInject(uri = mockAcknowledgedUri)
+    MockEndpoint acknowledged;
+    @EndpointInject(uri = mockTimeoutUri)
+    MockEndpoint timeout;
+    @EndpointInject(uri = mockAeExUri)
+    MockEndpoint ae;
+    @EndpointInject(uri = mockArExUri)
+    MockEndpoint ar;
+    @EndpointInject(uri = mockFrameExUri)
+    MockEndpoint frame;
+
+    @Override
+    protected String getBlueprintDescriptor() {
+        return "OSGI-INF/blueprint/mllp-tcp-client-producer-test.xml";
+    }
+
+    @Override
+    protected Properties useOverridePropertiesWithPropertiesComponent() {
+        Properties props = new Properties();
+
+        props.setProperty("sourceUri", sourceUri);
+        props.setProperty("acknowledgedUri", mockAcknowledgedUri);
+        props.setProperty("timeoutUri", mockTimeoutUri);
+        props.setProperty("frameErrorUri", mockFrameExUri);
+        props.setProperty("errorAcknowledgementUri", mockAeExUri);
+        props.setProperty("rejectAcknowledgementUri", mockArExUri);
+
+        props.setProperty("mllp.port", Integer.toString(mllpServer.getListenPort()));
+
+        return props;
+    }
+
+    /*
+        This doesn't seem to work
+        @Override
+        protected String useOverridePropertiesWithConfigAdmin(Dictionary props) throws Exception {
+
+            props.put("mllp.port", mllpServer.getListenPort() );
+
+            return "MllpTcpClientProducer";
+        }
+    */
+
+    @Override
+    protected void addServicesOnStartup(Map<String, KeyValueHolder<Object, Dictionary>> services) {
+        ComponentResolver testResolver = new DefaultComponentResolver();
+
+        services.put(ComponentResolver.class.getName(), asService(testResolver, "component", "mllp"));
+    }
+
+    @Test()
+    public void testSendMultipleMessages() throws Exception {
+        int messageCount = 500;
+        acknowledged.setExpectedMessageCount(messageCount);
+        timeout.setExpectedMessageCount(0);
+        frame.setExpectedMessageCount(0);
+        ae.setExpectedMessageCount(0);
+        ar.setExpectedMessageCount(0);
+
+        // Uncomment one of these lines to see the NACKs handled
+        // mllpServer.setSendApplicationRejectAcknowledgementModulus(10);
+        // mllpServer.setSendApplicationErrorAcknowledgementModulus(10);
+
+        for (int i = 0; i < messageCount; ++i) {
+            log.debug("Triggering message {}", i);
+            Object response = source.requestBodyAndHeader(generateMessage(i), "CamelMllpMessageControlId", String.format("%05d", i));
+            log.debug("response {}\n{}", i, response);
+        }
+
+        assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
+    }
+
+
+}


Mime
View raw message