camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/6] camel git commit: CAMEL-10511: Updated MllpTcpClientProducer and MllpTcpServerConsumer to consume all available data on socket - backport to 2.17
Date Fri, 16 Dec 2016 21:59:10 GMT
Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x 55c621f99 -> a53540da1


http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java
b/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java
index 252e228..4191468 100644
--- a/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java
+++ b/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java
@@ -19,6 +19,7 @@ package org.apache.camel.test.junit.rule.mllp;
 import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
@@ -45,8 +46,6 @@ import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK;
  *
  * The server can be configured to simulate a large number
  * of error conditions.
- *
- * TODO:  This needs to be looked at - it may be orphaning threads
  */
 public class MllpServerResource extends ExternalResource {
     Logger log = LoggerFactory.getLogger(this.getClass());
@@ -59,6 +58,12 @@ public class MllpServerResource extends ExternalResource {
 
     boolean active = true;
 
+    int delayBeforeStartOfBlock;
+    int delayBeforeAcknowledgement;
+    int delayDuringAcknowledgement;
+    int delayAfterAcknowledgement;
+    int delayAfterEndOfBlock;
+
     int excludeStartOfBlockModulus;
     int excludeEndOfBlockModulus;
     int excludeEndOfDataModulus;
@@ -67,8 +72,11 @@ public class MllpServerResource extends ExternalResource {
 
     int sendOutOfBandDataModulus;
 
-    int disconnectBeforeAcknowledgementModulus;
-    int disconnectAfterAcknowledgementModulus;
+    int closeSocketBeforeAcknowledgementModulus;
+    int closeSocketAfterAcknowledgementModulus;
+
+    int resetSocketBeforeAcknowledgementModulus;
+    int resetSocketAfterAcknowledgementModulus;
 
     int sendApplicationRejectAcknowledgementModulus;
     int sendApplicationErrorAcknowledgementModulus;
@@ -76,6 +84,8 @@ public class MllpServerResource extends ExternalResource {
     Pattern sendApplicationRejectAcknowledgementPattern;
     Pattern sendApplicationErrorAcknowledgementPattern;
 
+    String acknowledgementString;
+
     ServerSocketThread serverSocketThread;
 
     public MllpServerResource() {
@@ -166,12 +176,52 @@ public class MllpServerResource extends ExternalResource {
         serverSocketThread.interrupt();
     }
 
+    public int getDelayBeforeStartOfBlock() {
+        return delayBeforeStartOfBlock;
+    }
+
+    public void setDelayBeforeStartOfBlock(int delayBeforeStartOfBlock) {
+        this.delayBeforeStartOfBlock = delayBeforeStartOfBlock;
+    }
+
+    public int getDelayBeforeAcknowledgement() {
+        return delayBeforeAcknowledgement;
+    }
+
+    public void setDelayBeforeAcknowledgement(int delayBeforeAcknowledgement) {
+        this.delayBeforeAcknowledgement = delayBeforeAcknowledgement;
+    }
+
+    public int getDelayDuringAcknowledgement() {
+        return delayDuringAcknowledgement;
+    }
+
+    public void setDelayDuringAcknowledgement(int delayDuringAcknowledgement) {
+        this.delayDuringAcknowledgement = delayDuringAcknowledgement;
+    }
+
+    public int getDelayAfterAcknowledgement() {
+        return delayAfterAcknowledgement;
+    }
+
+    public void setDelayAfterAcknowledgement(int delayAfterAcknowledgement) {
+        this.delayAfterAcknowledgement = delayAfterAcknowledgement;
+    }
+
+    public int getDelayAfterEndOfBlock() {
+        return delayAfterEndOfBlock;
+    }
+
+    public void setDelayAfterEndOfBlock(int delayAfterEndOfBlock) {
+        this.delayAfterEndOfBlock = delayAfterEndOfBlock;
+    }
+
     public boolean sendApplicationRejectAcknowledgement(String hl7Message) {
-        return evaluatePatten(hl7Message, this.sendApplicationErrorAcknowledgementPattern);
+        return evaluatePattern(hl7Message, this.sendApplicationErrorAcknowledgementPattern);
     }
 
     public boolean sendApplicationErrorAcknowledgement(String hl7Message) {
-        return evaluatePatten(hl7Message, this.sendApplicationRejectAcknowledgementPattern);
+        return evaluatePattern(hl7Message, this.sendApplicationRejectAcknowledgementPattern);
     }
 
     public boolean sendApplicationRejectAcknowledgement(int messageCount) {
@@ -198,12 +248,20 @@ public class MllpServerResource extends ExternalResource {
         return evaluateModulus(messageCount, excludeEndOfDataModulus);
     }
 
-    public boolean disconnectBeforeAcknowledgement(int messageCount) {
-        return evaluateModulus(messageCount, disconnectBeforeAcknowledgementModulus);
+    public boolean closeSocketBeforeAcknowledgement(int messageCount) {
+        return evaluateModulus(messageCount, closeSocketBeforeAcknowledgementModulus);
     }
 
-    public boolean disconnectAfterAcknowledgement(int messageCount) {
-        return evaluateModulus(messageCount, disconnectAfterAcknowledgementModulus);
+    public boolean closeSocketAfterAcknowledgement(int messageCount) {
+        return evaluateModulus(messageCount, closeSocketAfterAcknowledgementModulus);
+    }
+
+    public boolean resetSocketBeforeAcknowledgement(int messageCount) {
+        return evaluateModulus(messageCount, resetSocketBeforeAcknowledgementModulus);
+    }
+
+    public boolean resetSocketAfterAcknowledgement(int messageCount) {
+        return evaluateModulus(messageCount, resetSocketAfterAcknowledgementModulus);
     }
 
     public boolean sendOutOfBandData(int messageCount) {
@@ -221,7 +279,7 @@ public class MllpServerResource extends ExternalResource {
         }
     }
 
-    private boolean evaluatePatten(String hl7Message, Pattern pattern) {
+    private boolean evaluatePattern(String hl7Message, Pattern pattern) {
         boolean retValue = false;
 
         if (null != pattern && pattern.matcher(hl7Message).matches()) {
@@ -369,27 +427,51 @@ public class MllpServerResource extends ExternalResource {
         }
     }
 
-    public int getDisconnectBeforeAcknowledgementModulus() {
-        return disconnectBeforeAcknowledgementModulus;
+    public int getCloseSocketBeforeAcknowledgementModulus() {
+        return closeSocketBeforeAcknowledgementModulus;
+    }
+
+    public void setCloseSocketBeforeAcknowledgementModulus(int closeSocketBeforeAcknowledgementModulus)
{
+        if (0 > closeSocketBeforeAcknowledgementModulus) {
+            this.closeSocketBeforeAcknowledgementModulus = 0;
+        } else {
+            this.closeSocketBeforeAcknowledgementModulus = closeSocketBeforeAcknowledgementModulus;
+        }
+    }
+
+    public int getCloseSocketAfterAcknowledgementModulus() {
+        return closeSocketAfterAcknowledgementModulus;
+    }
+
+    public void setCloseSocketAfterAcknowledgementModulus(int closeSocketAfterAcknowledgementModulus)
{
+        if (0 > closeSocketAfterAcknowledgementModulus) {
+            this.closeSocketAfterAcknowledgementModulus = 0;
+        } else {
+            this.closeSocketAfterAcknowledgementModulus = closeSocketAfterAcknowledgementModulus;
+        }
+    }
+
+    public int getResetSocketBeforeAcknowledgementModulus() {
+        return resetSocketBeforeAcknowledgementModulus;
     }
 
-    public void setDisconnectBeforeAcknowledgementModulus(int disconnectBeforeAcknowledgementModulus)
{
-        if (0 > disconnectBeforeAcknowledgementModulus) {
-            this.disconnectBeforeAcknowledgementModulus = 0;
+    public void setResetSocketBeforeAcknowledgementModulus(int resetSocketBeforeAcknowledgementModulus)
{
+        if (0 > resetSocketBeforeAcknowledgementModulus) {
+            this.resetSocketBeforeAcknowledgementModulus = 0;
         } else {
-            this.disconnectBeforeAcknowledgementModulus = disconnectBeforeAcknowledgementModulus;
+            this.resetSocketBeforeAcknowledgementModulus = resetSocketBeforeAcknowledgementModulus;
         }
     }
 
-    public int getDisconnectAfterAcknowledgementModulus() {
-        return disconnectAfterAcknowledgementModulus;
+    public int getResetSocketAfterAcknowledgementModulus() {
+        return resetSocketAfterAcknowledgementModulus;
     }
 
-    public void setDisconnectAfterAcknowledgementModulus(int disconnectAfterAcknowledgementModulus)
{
-        if (0 > disconnectAfterAcknowledgementModulus) {
-            this.disconnectAfterAcknowledgementModulus = 0;
+    public void setResetSocketAfterAcknowledgementModulus(int resetSocketAfterAcknowledgementModulus)
{
+        if (0 > resetSocketAfterAcknowledgementModulus) {
+            this.resetSocketAfterAcknowledgementModulus = 0;
         } else {
-            this.disconnectAfterAcknowledgementModulus = disconnectAfterAcknowledgementModulus;
+            this.resetSocketAfterAcknowledgementModulus = resetSocketAfterAcknowledgementModulus;
         }
     }
 
@@ -433,6 +515,14 @@ public class MllpServerResource extends ExternalResource {
         this.sendApplicationErrorAcknowledgementPattern = sendApplicationErrorAcknowledgementPattern;
     }
 
+    public String getAcknowledgementString() {
+        return acknowledgementString;
+    }
+
+    public void setAcknowledgementString(String acknowledgementString) {
+        this.acknowledgementString = acknowledgementString;
+    }
+
     public ServerSocketThread getServerSocketThread() {
         return serverSocketThread;
     }
@@ -441,41 +531,95 @@ public class MllpServerResource extends ExternalResource {
         this.serverSocketThread = serverSocketThread;
     }
 
-    void closeConnection(Socket socket) {
-        if (null != socket) {
-            if (!socket.isClosed()) {
-                try {
-                    socket.shutdownInput();
-                } catch (Exception ex) {
-                    log.warn("Exception encountered shutting down the input stream on the
client socket", ex);
-                }
-
-                try {
-                    socket.shutdownOutput();
-                } catch (Exception ex) {
-                    log.warn("Exception encountered shutting down the output stream on the
client socket", ex);
-                }
+    public void closeClientConnections() {
+        if (serverSocketThread != null) {
+            serverSocketThread.closeClientConnections();
+        }
+    }
 
-                try {
-                    socket.close();
-                } catch (Exception ex) {
-                    log.warn("Exception encountered closing the client socket", ex);
-                }
-            }
+    public void resetClientConnections() {
+        if (serverSocketThread != null) {
+            serverSocketThread.resetClientConnections();
         }
     }
 
-    void resetConnection(Socket socket) {
-        if (null != socket && !socket.isClosed()) {
-            try {
-                socket.setSoLinger(true, 0);
-            } catch (SocketException socketEx) {
-                log.debug("SocketException encountered setting SO_LINGER to 0 on the socket
to force a reset - ignoring", socketEx);
-            } finally {
-                closeConnection(socket);
+    /**
+     * Generates a HL7 Application Acknowledgement
+     *
+     * @param hl7Message          HL7 message that is being acknowledged
+     * @param acknowledgementCode AA, AE or AR
+     * @return a HL7 Application Acknowledgement
+     */
+    protected String generateAcknowledgement(String hl7Message, String acknowledgementCode)
{
+        final String defaulNackMessage =
+                "MSH|^~\\&|||||||NACK||P|2.2" + SEGMENT_DELIMITER
+                        + "MSA|AR|" + SEGMENT_DELIMITER
+                        + MESSAGE_TERMINATOR;
+
+        if (hl7Message == null) {
+            log.error("Invalid HL7 message for parsing operation. Please check your inputs");
+            return defaulNackMessage;
+        }
+
+        if (!("AA".equals(acknowledgementCode) || "AE".equals(acknowledgementCode) || "AR".equals(acknowledgementCode)))
{
+            throw new IllegalArgumentException("Acknowledgemnt Code must be AA, AE or AR:
" + acknowledgementCode);
+        }
+
+        String messageControlId;
+
+        int endOfMshSegment = hl7Message.indexOf(SEGMENT_DELIMITER);
+        if (-1 != endOfMshSegment) {
+            String mshSegment = hl7Message.substring(0, endOfMshSegment);
+            char fieldSeparator = mshSegment.charAt(3);
+            String fieldSeparatorPattern = Pattern.quote("" + fieldSeparator);
+            String[] mshFields = mshSegment.split(fieldSeparatorPattern);
+            if (mshFields.length == 0) {
+                log.error("Failed to split MSH Segment into fields");
+            } else {
+                StringBuilder ackBuilder = new StringBuilder(mshSegment.length() + 25);
+                // Build the MSH Segment
+                ackBuilder
+                        .append(mshFields[0]).append(fieldSeparator)
+                        .append(mshFields[1]).append(fieldSeparator)
+                        .append(mshFields[4]).append(fieldSeparator)
+                        .append(mshFields[5]).append(fieldSeparator)
+                        .append(mshFields[2]).append(fieldSeparator)
+                        .append(mshFields[3]).append(fieldSeparator)
+                        .append(mshFields[6]).append(fieldSeparator)
+                        .append(mshFields[7]).append(fieldSeparator)
+                        .append("ACK")
+                        .append(mshFields[8].substring(3));
+                for (int i = 9; i < mshFields.length; ++i) {
+                    ackBuilder.append(fieldSeparator).append(mshFields[i]);
+                }
+                // Empty fields at the end are not preserved by String.split, so preserve
them
+                int emptyFieldIndex = mshSegment.length() - 1;
+                if (fieldSeparator == mshSegment.charAt(mshSegment.length() - 1)) {
+                    ackBuilder.append(fieldSeparator);
+                    while (emptyFieldIndex >= 1 && mshSegment.charAt(emptyFieldIndex)
== mshSegment.charAt(emptyFieldIndex - 1)) {
+                        ackBuilder.append(fieldSeparator);
+                        --emptyFieldIndex;
+                    }
+                }
+                ackBuilder.append(SEGMENT_DELIMITER);
+
+                // Build the MSA Segment
+                ackBuilder
+                        .append("MSA").append(fieldSeparator)
+                        .append(acknowledgementCode).append(fieldSeparator)
+                        .append(mshFields[9]).append(fieldSeparator)
+                        .append(SEGMENT_DELIMITER);
+
+                // Terminate the message
+                ackBuilder.append(MESSAGE_TERMINATOR);
+
+                return ackBuilder.toString();
             }
+        } else {
+            log.error("Failed to find the end of the  MSH Segment");
         }
 
+        return null;
     }
 
     /**
@@ -499,22 +643,22 @@ public class MllpServerResource extends ExternalResource {
 
         boolean raiseExceptionOnAcceptTimeout;
 
-        public ServerSocketThread() throws IOException {
+        ServerSocketThread() throws IOException {
             bind();
         }
 
-        public ServerSocketThread(int listenPort) throws IOException {
+        ServerSocketThread(int listenPort) throws IOException {
             this.listenPort = listenPort;
             bind();
         }
 
-        public ServerSocketThread(int listenPort, int backlog) throws IOException {
+        ServerSocketThread(int listenPort, int backlog) throws IOException {
             this.listenPort = listenPort;
             this.backlog = backlog;
             bind();
         }
 
-        public ServerSocketThread(String listenHost, int listenPort, int backlog) throws
IOException {
+        ServerSocketThread(String listenHost, int listenPort, int backlog) throws IOException
{
             this.listenHost = listenHost;
             this.listenPort = listenPort;
             this.backlog = backlog;
@@ -566,6 +710,22 @@ public class MllpServerResource extends ExternalResource {
             log.info("Opened TCP Listener on port {}", serverSocket.getLocalPort());
         }
 
+        void closeClientConnections() {
+            if (clientSocketThreads != null) {
+                for (ClientSocketThread clientSocketThread : clientSocketThreads) {
+                    clientSocketThread.closeConnection();
+                }
+            }
+        }
+
+        void resetClientConnections() {
+            if (clientSocketThreads != null) {
+                for (ClientSocketThread clientSocketThread : clientSocketThreads) {
+                    clientSocketThread.resetConnection();
+                }
+            }
+        }
+
         /**
          * Accept TCP connections and create ClientSocketThreads for them
          */
@@ -586,7 +746,16 @@ public class MllpServerResource extends ExternalResource {
                     if (null == clientSocket) {
                         continue;
                     } else if (!clientSocket.isClosed()) {
-                        resetConnection(clientSocket);
+                        try {
+                            clientSocket.setSoLinger(true, 0);
+                        } catch (SocketException soLingerEx) {
+                            log.warn("Ignoring SocketException encountered when setting SO_LINGER
in preparation of resetting client Socket", soLingerEx);
+                        }
+                        try {
+                            clientSocket.close();
+                        } catch (IOException ioEx) {
+                            log.warn("Ignoring IOException encountered when resetting client
Socket", ioEx);
+                        }
                         continue;
                     } else {
                         throw new MllpJUnitResourceException("Unexpected SocketException
encountered accepting client connection", socketEx);
@@ -685,6 +854,10 @@ public class MllpServerResource extends ExternalResource {
             }
             super.interrupt();
         }
+
+        public void close() {
+
+        }
     }
 
     /**
@@ -711,6 +884,35 @@ public class MllpServerResource extends ExternalResource {
             this.clientSocket = clientSocket;
         }
 
+        void closeConnection() {
+            if (clientSocket != null && !clientSocket.isClosed()) {
+                try {
+                    clientSocket.close();
+                } catch (IOException ioEx) {
+                    log.warn("Ignoring IOException encountered when closing client Socket",
ioEx);
+                } finally {
+                    clientSocket = null;
+                }
+            }
+        }
+
+        void resetConnection() {
+            if (clientSocket != null && !clientSocket.isClosed()) {
+                try {
+                    clientSocket.setSoLinger(true, 0);
+                } catch (SocketException socketEx) {
+                    log.warn("Ignoring SocketException encountered when setting SO_LINGER
in preparation of resetting client Socket", socketEx);
+                }
+                try {
+                    clientSocket.close();
+                } catch (IOException ioEx) {
+                    log.warn("Ignoring IOException encountered when resetting client Socket",
ioEx);
+                } finally {
+                    clientSocket = null;
+                }
+            }
+        }
+
         /**
          * Receives HL7 messages and replies with HL7 Acknowledgements.
          *
@@ -738,12 +940,30 @@ public class MllpServerResource extends ExternalResource {
                     } catch (Exception unexpectedEx) {
                         throw new MllpJUnitResourceException("Unexpected exception encounted
getting input stream", unexpectedEx);
                     }
-                    String parsedHL7Message = getMessage(instream);
+                    String parsedHL7Message;
+                    try {
+                        parsedHL7Message = getMessage(instream);
+                    } catch (SocketTimeoutException timeoutEx) {
+                        log.info("Waiting for message from client");
+                        continue;
+                    }
 
                     if (null != parsedHL7Message && parsedHL7Message.length() >
0) {
                         ++messageCounter;
-                        if (disconnectBeforeAcknowledgement(messageCounter)) {
-                            log.warn("Disconnecting before sending acknowledgement");
+                        if (closeSocketBeforeAcknowledgement(messageCounter)) {
+                            log.warn("Closing socket before sending acknowledgement");
+                            clientSocket.shutdownInput();
+                            clientSocket.shutdownOutput();
+                            clientSocket.close();
+                            break;
+                        }
+                        if (resetSocketBeforeAcknowledgement(messageCounter)) {
+                            log.warn("Resetting socket before sending acknowledgement");
+                            try {
+                                clientSocket.setSoLinger(true, 0);
+                            } catch (IOException ioEx) {
+                                log.warn("Ignoring IOException encountered setting SO_LINGER
when prepareing to reset socket", ioEx);
+                            }
                             clientSocket.shutdownInput();
                             clientSocket.shutdownOutput();
                             clientSocket.close();
@@ -752,39 +972,67 @@ public class MllpServerResource extends ExternalResource {
 
                         String acknowledgmentMessage;
 
-                        if (sendApplicationErrorAcknowledgement(messageCounter) || sendApplicationErrorAcknowledgement(parsedHL7Message))
{
-                            acknowledgmentMessage = generateAcknowledgementMessage(parsedHL7Message,
"AE");
-                        } else if (sendApplicationRejectAcknowledgement(messageCounter) ||
sendApplicationRejectAcknowledgement(parsedHL7Message)) {
-                            acknowledgmentMessage = generateAcknowledgementMessage(parsedHL7Message,
"AR");
+                        if (acknowledgementString == null) {
+                            if (sendApplicationErrorAcknowledgement(messageCounter) || sendApplicationErrorAcknowledgement(parsedHL7Message))
{
+                                acknowledgmentMessage = generateAcknowledgementMessage(parsedHL7Message,
"AE");
+                            } else if (sendApplicationRejectAcknowledgement(messageCounter)
|| sendApplicationRejectAcknowledgement(parsedHL7Message)) {
+                                acknowledgmentMessage = generateAcknowledgementMessage(parsedHL7Message,
"AR");
+                            } else {
+                                acknowledgmentMessage = generateAcknowledgementMessage(parsedHL7Message);
+                            }
                         } else {
-                            acknowledgmentMessage = generateAcknowledgementMessage(parsedHL7Message);
-
+                            acknowledgmentMessage = acknowledgementString;
                         }
+
                         BufferedOutputStream outstream = new BufferedOutputStream(clientSocket.getOutputStream());
 
                         if (sendOutOfBandData(messageCounter)) {
                             byte[] outOfBandDataBytes = "Out Of Band Hl7MessageGenerator".getBytes();
                             outstream.write(outOfBandDataBytes, 0, outOfBandDataBytes.length);
-
                         }
+
                         if (excludeStartOfBlock(messageCounter)) {
-                            log.warn("NOT sending bMLLP_ENVELOPE_START_OF_BLOCK");
+                            log.warn("NOT sending START_OF_BLOCK");
                         } else {
                             outstream.write(START_OF_BLOCK);
+                            if (delayBeforeStartOfBlock > 0) {
+                                uncheckedSleep(delayBeforeStartOfBlock);
+                                uncheckedFlush(outstream);
+                            }
                         }
 
                         if (excludeAcknowledgement(messageCounter)) {
                             log.info("NOT sending Acknowledgement body");
                         } else {
+                            if (delayBeforeAcknowledgement > 0) {
+                                uncheckedSleep(delayBeforeAcknowledgement);
+                            }
                             log.debug("Buffering Acknowledgement\n\t{}", acknowledgmentMessage.replace('\r',
'\n'));
                             byte[] ackBytes = acknowledgmentMessage.getBytes();
-                            outstream.write(ackBytes, 0, ackBytes.length);
+                            if (delayDuringAcknowledgement > 0) {
+                                int firstHalf = ackBytes.length / 2;
+                                outstream.write(ackBytes, 0, firstHalf);
+                                uncheckedFlush(outstream);
+                                uncheckedSleep(delayDuringAcknowledgement);
+                                outstream.write(ackBytes, firstHalf, ackBytes.length - firstHalf);
+                                uncheckedFlush(outstream);
+                            } else {
+                                outstream.write(ackBytes, 0, ackBytes.length);
+                            }
+                            if (delayAfterAcknowledgement > 0) {
+                                uncheckedFlush(outstream);
+                                uncheckedSleep(delayAfterAcknowledgement);
+                            }
                         }
 
                         if (excludeEndOfBlock(messageCounter)) {
                             log.warn("NOT sending bMLLP_ENVELOPE_END_OF_BLOCK");
                         } else {
                             outstream.write(END_OF_BLOCK);
+                            if (delayAfterEndOfBlock > 0) {
+                                uncheckedFlush(outstream);
+                                uncheckedSleep(delayAfterEndOfBlock);
+                            }
                         }
 
                         if (excludeEndOfData(messageCounter)) {
@@ -794,9 +1042,9 @@ public class MllpServerResource extends ExternalResource {
                         }
 
                         log.debug("Writing Acknowledgement\n\t{}", acknowledgmentMessage.replace('\r',
'\n'));
-                        outstream.flush();
+                        uncheckedFlush(outstream);
 
-                        if (disconnectAfterAcknowledgement(messageCounter)) {
+                        if (closeSocketAfterAcknowledgement(messageCounter)) {
                             log.info("Closing Client");
                             clientSocket.shutdownInput();
                             clientSocket.shutdownOutput();
@@ -806,16 +1054,18 @@ public class MllpServerResource extends ExternalResource {
                     }
                 }
             } catch (IOException e) {
-                String errorMessage = "Error whiling reading and writing to clientSocket";
+                String errorMessage = "Error while reading and writing from clientSocket";
                 log.error(errorMessage, e);
                 throw new MllpJUnitResourceException(errorMessage, e);
             } finally {
-                try {
-                    clientSocket.close();
-                } catch (IOException e) {
-                    String errorMessage = "Error whiling attempting to close to client Socket";
-                    log.error(errorMessage, e);
-                    throw new MllpJUnitResourceException(errorMessage, e);
+                if (clientSocket != null) {
+                    try {
+                        clientSocket.close();
+                    } catch (IOException e) {
+                        String errorMessage = "Error while attempting to close to client
Socket";
+                        log.error(errorMessage, e);
+                        throw new MllpJUnitResourceException(errorMessage, e);
+                    }
                 }
             }
 
@@ -829,10 +1079,8 @@ public class MllpServerResource extends ExternalResource {
          * @return the MLLP payload
          * @throws IOException when the underlying Java Socket calls raise these exceptions
          */
-        // TODO:  Enhance this to detect non-HL7 data (i.e. look for MSH after START_OF_BLOCK)
         public String getMessage(InputStream anInputStream) throws IOException {
             try {
-                // TODO:  Enhance this to read a bunch of characters and log, rather than
log them one at a time
                 boolean waitingForStartOfBlock = true;
                 while (waitingForStartOfBlock) {
                     int potentialStartCharacter = anInputStream.read();
@@ -847,14 +1095,16 @@ public class MllpServerResource extends ExternalResource {
                     }
                 }
             } catch (SocketException socketEx) {
-                if (clientSocket.isClosed()) {
-                    log.info("Client socket closed while waiting for MLLP_ENVELOPE_START_OF_BLOCK");
-                } else if (clientSocket.isConnected()) {
-                    log.info("SocketException encountered while waiting for MLLP_ENVELOPE_START_OF_BLOCK");
-                    resetConnection(clientSocket);
-                } else {
-                    log.error("Unable to read from socket stream when expected bMLLP_ENVELOPE_START_OF_BLOCK
- resetting connection ", socketEx);
-                    resetConnection(clientSocket);
+                if (clientSocket != null) {
+                    if (clientSocket.isClosed()) {
+                        log.info("Client socket closed while waiting for MLLP_ENVELOPE_START_OF_BLOCK");
+                    } else if (clientSocket.isConnected()) {
+                        log.info("SocketException encountered while waiting for MLLP_ENVELOPE_START_OF_BLOCK");
+                        resetConnection();
+                    } else {
+                        log.error("Unable to read from socket stream when expected bMLLP_ENVELOPE_START_OF_BLOCK
- resetting connection ", socketEx);
+                        resetConnection();
+                    }
                 }
                 return null;
             }
@@ -907,75 +1157,7 @@ public class MllpServerResource extends ExternalResource {
          * @return a HL7 Application Acknowledgement
          */
         private String generateAcknowledgementMessage(String hl7Message, String acknowledgementCode)
{
-            final String defaulNackMessage =
-                    "MSH|^~\\&|||||||NACK||P|2.2" + SEGMENT_DELIMITER
-                            + "MSA|AR|" + SEGMENT_DELIMITER
-                            + MESSAGE_TERMINATOR;
-
-            if (hl7Message == null) {
-                log.error("Invalid HL7 message for parsing operation. Please check your inputs");
-                return defaulNackMessage;
-            }
-
-            if (!("AA".equals(acknowledgementCode) || "AE".equals(acknowledgementCode) ||
"AR".equals(acknowledgementCode))) {
-                throw new IllegalArgumentException("Acknowledgemnt Code must be AA, AE or
AR: " + acknowledgementCode);
-            }
-
-            String messageControlId;
-
-            int endOfMshSegment = hl7Message.indexOf(SEGMENT_DELIMITER);
-            if (-1 != endOfMshSegment) {
-                String mshSegment = hl7Message.substring(0, endOfMshSegment);
-                char fieldSeparator = mshSegment.charAt(3);
-                String fieldSeparatorPattern = Pattern.quote("" + fieldSeparator);
-                String[] mshFields = mshSegment.split(fieldSeparatorPattern);
-                if (mshFields.length == 0) {
-                    log.error("Failed to split MSH Segment into fields");
-                } else {
-                    StringBuilder ackBuilder = new StringBuilder(mshSegment.length() + 25);
-                    // Build the MSH Segment
-                    ackBuilder
-                            .append(mshFields[0]).append(fieldSeparator)
-                            .append(mshFields[1]).append(fieldSeparator)
-                            .append(mshFields[4]).append(fieldSeparator)
-                            .append(mshFields[5]).append(fieldSeparator)
-                            .append(mshFields[2]).append(fieldSeparator)
-                            .append(mshFields[3]).append(fieldSeparator)
-                            .append(mshFields[6]).append(fieldSeparator)
-                            .append(mshFields[7]).append(fieldSeparator)
-                            .append("ACK")
-                            .append(mshFields[8].substring(3));
-                    for (int i = 9; i < mshFields.length; ++i) {
-                        ackBuilder.append(fieldSeparator).append(mshFields[i]);
-                    }
-                    // Empty fields at the end are not preserved by String.split, so preserve
them
-                    int emptyFieldIndex = mshSegment.length() - 1;
-                    if (fieldSeparator == mshSegment.charAt(mshSegment.length() - 1)) {
-                        ackBuilder.append(fieldSeparator);
-                        while (emptyFieldIndex >= 1 && mshSegment.charAt(emptyFieldIndex)
== mshSegment.charAt(emptyFieldIndex - 1)) {
-                            ackBuilder.append(fieldSeparator);
-                            --emptyFieldIndex;
-                        }
-                    }
-                    ackBuilder.append(SEGMENT_DELIMITER);
-
-                    // Build the MSA Segment
-                    ackBuilder
-                            .append("MSA").append(fieldSeparator)
-                            .append(acknowledgementCode).append(fieldSeparator)
-                            .append(mshFields[9]).append(fieldSeparator)
-                            .append(SEGMENT_DELIMITER);
-
-                    // Terminate the message
-                    ackBuilder.append(MESSAGE_TERMINATOR);
-
-                    return ackBuilder.toString();
-                }
-            } else {
-                log.error("Failed to find the end of the  MSH Segment");
-            }
-
-            return null;
+            return generateAcknowledgement(hl7Message, acknowledgementCode);
         }
 
         @Override
@@ -989,6 +1171,23 @@ public class MllpServerResource extends ExternalResource {
             }
             super.interrupt();
         }
+
+        private void uncheckedSleep(long milliseconds) {
+            try {
+                Thread.sleep(milliseconds);
+            } catch (InterruptedException e) {
+                log.warn("Sleep interrupted", e);
+            }
+
+        }
+
+        private void uncheckedFlush(OutputStream outputStream) {
+            try {
+                outputStream.flush();
+            } catch (IOException e) {
+                log.warn("Ignoring exception caught while flushing output stream", e);
+            }
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/test/java/org/apache/camel/test/util/PayloadBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/test/util/PayloadBuilder.java
b/components/camel-mllp/src/test/java/org/apache/camel/test/util/PayloadBuilder.java
new file mode 100644
index 0000000..3959b43
--- /dev/null
+++ b/components/camel-mllp/src/test/java/org/apache/camel/test/util/PayloadBuilder.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.test.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class PayloadBuilder {
+    ByteArrayOutputStream builderStream = new ByteArrayOutputStream();
+
+    public PayloadBuilder() {
+    }
+
+    public PayloadBuilder(byte b) throws IOException {
+        this.append(b);
+    }
+
+    public PayloadBuilder(byte[] bytes) throws IOException {
+        this.append(bytes);
+    }
+
+    public PayloadBuilder(char... chars) throws IOException {
+        this.append(chars);
+    }
+
+    public PayloadBuilder(String... strings) throws IOException {
+        this.append(strings);
+    }
+
+    public PayloadBuilder append(byte b) throws IOException {
+        builderStream.write(b);
+
+        return this;
+    }
+
+    public PayloadBuilder append(byte[] bytes) throws IOException {
+        builderStream.write(bytes);
+
+        return this;
+    }
+
+    public PayloadBuilder append(char... chars) throws IOException {
+        if (chars != null) {
+            for (char c : chars) {
+                builderStream.write(c);
+            }
+        }
+
+        return this;
+    }
+
+    public PayloadBuilder append(String... strings) throws IOException {
+        if (strings != null) {
+            for (String s : strings) {
+                builderStream.write(s.getBytes());
+            }
+        }
+
+        return this;
+    }
+
+    public PayloadBuilder append(byte[] payload, int startPosition, int length) throws IOException
{
+        builderStream.write(payload, startPosition, length);
+
+        return this;
+    }
+
+    public byte[] build() {
+        byte[] answer = builderStream.toByteArray();
+
+        builderStream.reset();
+
+        return answer;
+    }
+
+    public static byte[] build(byte b) {
+        try {
+            return new PayloadBuilder(b).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(byte) failure", e);
+        }
+    }
+
+    public static byte[] build(byte b, byte... bytes) {
+        try {
+            return new PayloadBuilder(b).append(bytes).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(byte) failure", e);
+        }
+    }
+
+    public static byte[] build(byte[] bytes) {
+        try {
+            return new PayloadBuilder(bytes).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(byte[]) failure", e);
+        }
+    }
+
+    public static byte[] build(char c) {
+        try {
+            return new PayloadBuilder(c).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(char...) failure", e);
+        }
+    }
+
+    public static byte[] build(char c, char... chars) {
+        try {
+            return new PayloadBuilder(c).append(chars).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(char...) failure", e);
+        }
+    }
+
+    public static byte[] build(char[] chars) {
+        try {
+            return new PayloadBuilder(chars).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(char...) failure", e);
+        }
+    }
+
+    public static byte[] build(String s) {
+        try {
+            return new PayloadBuilder(s).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(String) failure", e);
+        }
+    }
+
+    public static byte[] build(String[] strings) {
+        try {
+            return new PayloadBuilder(strings).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(String[]) failure", e);
+        }
+    }
+
+    public static byte[] build(char start, String s) {
+        try {
+            return new PayloadBuilder(start)
+                    .append(s)
+                    .build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(String) failure", e);
+        }
+    }
+
+    public static byte[] build(char start, String s, char... end) {
+        try {
+            return new PayloadBuilder(start)
+                    .append(s)
+                    .append(end)
+                    .build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(char, String, char...) failure",
e);
+        }
+    }
+
+    public static byte[] build(char start, byte[] bytes, char... end) {
+        try {
+            return new PayloadBuilder(start)
+                    .append(bytes)
+                    .append(end).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(char, byte[], char...) failure",
e);
+        }
+    }
+
+    public static byte[] build(String s, char... end) {
+        try {
+            return new PayloadBuilder(s)
+                    .append(end).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(String, char...) failure", e);
+        }
+    }
+
+    public static byte[] build(byte[] bytes, char... end) {
+        try {
+            return new PayloadBuilder(bytes)
+                    .append(end).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(byte[], char...) failure", e);
+        }
+    }
+
+    public static byte[] build(byte[] bytes, String s) {
+        try {
+            return new PayloadBuilder(bytes)
+                    .append(s).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(byte[], String) failure", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/test/resources/OSGI-INF/blueprint/mllp-tcp-client-producer-test.xml
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/resources/OSGI-INF/blueprint/mllp-tcp-client-producer-test.xml
b/components/camel-mllp/src/test/resources/OSGI-INF/blueprint/mllp-tcp-client-producer-test.xml
index 03a07a4..e3c10ae 100644
--- a/components/camel-mllp/src/test/resources/OSGI-INF/blueprint/mllp-tcp-client-producer-test.xml
+++ b/components/camel-mllp/src/test/resources/OSGI-INF/blueprint/mllp-tcp-client-producer-test.xml
@@ -61,7 +61,7 @@
         </onException>
 
         <onException>
-            <exception>org.apache.camel.component.mllp.MllpCorruptFrameException</exception>
+            <exception>org.apache.camel.component.mllp.MllpFrameException</exception>
             <redeliveryPolicy logHandled="true"/>
             <handled>
                 <constant>true</constant>


Mime
View raw message