Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3A2ED182BB for ; Wed, 30 Dec 2015 09:36:50 +0000 (UTC) Received: (qmail 72359 invoked by uid 500); 30 Dec 2015 09:36:50 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 72215 invoked by uid 500); 30 Dec 2015 09:36:50 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 72096 invoked by uid 99); 30 Dec 2015 09:36:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Dec 2015 09:36:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D7BC1E08DB; Wed, 30 Dec 2015 09:36:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Wed, 30 Dec 2015 09:36:51 -0000 Message-Id: <4578c575bc2b486faf10781d672efcb4@git.apache.org> In-Reply-To: <41b1fe56697e42d984fe6db77510c61b@git.apache.org> References: <41b1fe56697e42d984fe6db77510c61b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/6] camel git commit: Added camel-mllp component http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java new file mode 100644 index 0000000..362a30f --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java @@ -0,0 +1,550 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.component.mllp.impl.MllpUtil; +import org.apache.camel.converter.IOConverter; +import org.apache.camel.impl.DefaultConsumer; +import org.apache.camel.processor.mllp.Hl7AcknowledgementGenerationException; +import org.apache.camel.processor.mllp.Hl7AcknowledgementGenerator; +import org.apache.camel.util.IOHelper; + +import static org.apache.camel.component.mllp.MllpConstants.*; +import static org.apache.camel.component.mllp.MllpEndpoint.SEGMENT_DELIMITER; +import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK; + +/** + * The MLLP consumer. + */ +public class MllpTcpServerConsumer extends DefaultConsumer { + ServerSocketThread serverSocketThread; + + List clientThreads = new LinkedList<>(); + + private final MllpEndpoint endpoint; + + public MllpTcpServerConsumer(MllpEndpoint endpoint, Processor processor) { + super(endpoint, processor); + log.trace("MllpTcpServerConsumer(endpoint, processor)"); + + + this.endpoint = endpoint; + + } + + @Override + protected void doStart() throws Exception { + log.debug("doStart() - creating acceptor thread"); + + ServerSocket serverSocket = new ServerSocket(); + if (null != endpoint.receiveBufferSize) { + serverSocket.setReceiveBufferSize(endpoint.receiveBufferSize); + } + + serverSocket.setReuseAddress(endpoint.reuseAddress); + + // Accept Timeout + serverSocket.setSoTimeout(endpoint.acceptTimeout); + + InetSocketAddress socketAddress = new InetSocketAddress(endpoint.getHostname(), endpoint.getPort()); + serverSocket.bind(socketAddress, endpoint.backlog); + + serverSocketThread = new ServerSocketThread(serverSocket); + serverSocketThread.start(); + + super.doStart(); + } + + @Override + protected void doStop() throws Exception { + log.debug("doStop()"); + + switch (serverSocketThread.getState()) { + case TERMINATED: + // This is what we hope for + break; + case NEW: + case RUNNABLE: + case BLOCKED: + case WAITING: + case TIMED_WAITING: + default: + serverSocketThread.interrupt(); + break; + } + + serverSocketThread = null; + + super.doStop(); + } + + @Override + protected void doSuspend() throws Exception { + log.debug("doSuspend()"); + + super.doSuspend(); + } + + @Override + protected void doResume() throws Exception { + log.debug("doResume()"); + + super.doSuspend(); + } + + @Override + protected void doShutdown() throws Exception { + log.debug("doShutdown()"); + + super.doShutdown(); + } + + + /** + * Nested Class to handle the ServerSocket.accept requests + */ + class ServerSocketThread extends Thread { + ServerSocket serverSocket; + + ServerSocketThread(ServerSocket serverSocket) { + this.setName(createThreadName(serverSocket)); + + this.serverSocket = serverSocket; + } + + /** + * Derive a thread name from the class name, the component URI and the connection information. + *

+ * The String will in the format [endpoint key] - [local socket address] + * + * @return String for thread name + */ + String createThreadName(ServerSocket serverSocket) { + // Get the classname without the package. This is a nested class, so we want the parent class name included + String fullClassName = this.getClass().getName(); + String className = fullClassName.substring(fullClassName.lastIndexOf('.') + 1); + + // Get the URI without options + String fullEndpointKey = endpoint.getEndpointKey(); + String endpointKey; + if (fullEndpointKey.contains("?")) { + endpointKey = fullEndpointKey.substring(0, fullEndpointKey.indexOf('?')); + } else { + endpointKey = fullEndpointKey; + } + + // Now put it all together + return String.format("%s[%s] - %s", className, endpointKey, serverSocket.getLocalSocketAddress()); + } + + /** + * The main ServerSocket.accept() loop + *

+ * NOTE: When a connection is received, the Socket is checked after a brief delay in an attempt to determine + * if this is a load-balancer probe. The test is done before the ClientSocketThread is created to avoid creating + * a large number of short lived threads, which is what can occur if the load balancer polling interval is very + * short. + */ + public void run() { + log.debug("Starting acceptor thread"); + + while (null != serverSocket && serverSocket.isBound() && !serverSocket.isClosed()) { + try { + /* ? set this here ? */ + // serverSocket.setSoTimeout( 10000 ); + // TODO: Need to check maxConnections and figure out what to do when exceeded + Socket socket = serverSocket.accept(); + + /* Wait a bit and then check and see if the socket is really there - it could be a load balancer + pinging the port + */ + Thread.sleep(100); + if (socket.isConnected() && !socket.isClosed()) { + log.debug("Socket appears to be there - check for available data"); + InputStream inputStream; + try { + inputStream = socket.getInputStream(); + } catch (IOException ioEx) { + // Bad Socket - + log.warn("Failed to retrieve the InputStream for socket after the initial connection was accepted"); + MllpUtil.resetConnection(socket); + continue; + } + + if (0 < inputStream.available()) { + // Something is there - start the client thread + ClientSocketThread clientThread = new ClientSocketThread(socket, null); + clientThreads.add(clientThread); + clientThread.start(); + continue; + } + + // The easy check failed - so trigger a blocking read + socket.setSoTimeout(100); + try { + int tmpByte = inputStream.read(); + socket.setSoTimeout(endpoint.responseTimeout); + if (-1 == tmpByte) { + log.debug("Socket.read() returned END_OF_STREAM - resetting connection"); + MllpUtil.resetConnection(socket); + } else { + ClientSocketThread clientThread = new ClientSocketThread(socket, tmpByte); + clientThreads.add(clientThread); + clientThread.start(); + } + } catch (SocketTimeoutException timeoutEx) { + // No data, but the socket is there + log.debug("No Data - but the socket is there. Starting ClientSocketThread"); + ClientSocketThread clientThread = new ClientSocketThread(socket, null); + clientThreads.add(clientThread); + clientThread.start(); + } + } + } catch (SocketTimeoutException timeoutEx) { + // No new clients + log.trace("SocketTimeoutException waiting for new connections - no new connections"); + + for (int i = clientThreads.size() - 1; i >= 0; --i) { + ClientSocketThread thread = clientThreads.get(i); + if (!thread.isAlive()) { + clientThreads.remove(i); + } + } + } catch (InterruptedException interruptEx) { + log.info("accept loop interrupted - closing ServerSocket"); + try { + serverSocket.close(); + } catch (Exception ex) { + log.warn("Exception encountered closing ServerSocket after InterruptedException", ex); + } + } catch (Exception ex) { + log.error("Exception accepting new connection", ex); + } + } + } + + } + + class ClientSocketThread extends Thread { + Socket clientSocket; + Hl7AcknowledgementGenerator acknowledgementGenerator = new Hl7AcknowledgementGenerator(); + + Integer initialByte; + + ClientSocketThread(Socket clientSocket, Integer initialByte) throws IOException { + this.initialByte = initialByte; + this.setName(createThreadName(clientSocket)); + this.clientSocket = clientSocket; + this.clientSocket.setKeepAlive(endpoint.keepAlive); + this.clientSocket.setTcpNoDelay(endpoint.tcpNoDelay); + if (null != endpoint.receiveBufferSize) { + this.clientSocket.setReceiveBufferSize(endpoint.receiveBufferSize); + } + if (null != endpoint.sendBufferSize) { + this.clientSocket.setSendBufferSize(endpoint.sendBufferSize); + } + this.clientSocket.setReuseAddress(endpoint.reuseAddress); + this.clientSocket.setSoLinger(false, -1); + + // Read Timeout + this.clientSocket.setSoTimeout(endpoint.responseTimeout); + + } + + /** + * derive a thread name from the class name, the component URI and the connection information + *

+ * The String will in the format [endpoint key] - [local socket address] -> [remote socket address] + * + * @return the thread name + */ + String createThreadName(Socket socket) { + // Get the classname without the package. This is a nested class, so we want the parent class name included + String fullClassName = this.getClass().getName(); + String className = fullClassName.substring(fullClassName.lastIndexOf('.') + 1); + + // Get the URI without options + String fullEndpointKey = endpoint.getEndpointKey(); + String endpointKey; + if (fullEndpointKey.contains("?")) { + endpointKey = fullEndpointKey.substring(0, fullEndpointKey.indexOf('?')); + } else { + endpointKey = fullEndpointKey; + } + + // Now put it all together + return String.format("%s[%s] - %s -> %s", className, endpointKey, socket.getLocalSocketAddress(), socket.getRemoteSocketAddress()); + } + + @Override + public void run() { + + while (null != clientSocket && clientSocket.isConnected() && !clientSocket.isClosed()) { + byte[] hl7MessageBytes = null; + // Send the message on for processing and wait for the response + log.debug("Reading data ...."); + try { + if (null != initialByte && START_OF_BLOCK == initialByte) { + hl7MessageBytes = MllpUtil.closeFrame(clientSocket); + } else { + try { + MllpUtil.openFrame(clientSocket); + } catch (SocketTimeoutException timeoutEx) { + // When thrown by openFrame, it indicates that no data was available - but no error + continue; + } + hl7MessageBytes = MllpUtil.closeFrame(clientSocket); + } + } catch (MllpException mllpEx) { + Exchange exchange = endpoint.createExchange(ExchangePattern.InOut); + exchange.setException(mllpEx); + return; + } finally { + initialByte = null; + } + + if (null == hl7MessageBytes) { + continue; + } + + log.debug("Populating the exchange with received message"); + Exchange exchange = endpoint.createExchange(ExchangePattern.InOut); + Message message = exchange.getIn(); + message.setBody(hl7MessageBytes, byte[].class); + + message.setHeader(MLLP_LOCAL_ADDRESS, clientSocket.getLocalAddress().toString()); + message.setHeader(MLLP_REMOTE_ADDRESS, clientSocket.getRemoteSocketAddress()); + + populateHl7DataHeaders(exchange, message, hl7MessageBytes); + + + log.debug("Calling processor"); + try { + getProcessor().process(exchange); + // processed the message - send the acknowledgement + + // Check BEFORE_SEND Properties + if (exchange.getProperty(MLLP_RESET_CONNECTION_BEFORE_SEND, boolean.class)) { + MllpUtil.resetConnection(clientSocket); + return; + } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_BEFORE_SEND, boolean.class)) { + MllpUtil.closeConnection(clientSocket); + } + + // Find the acknowledgement body + byte[] acknowledgementMessageBytes = exchange.getProperty(MLLP_ACKNOWLEDGEMENT, byte[].class); + String acknowledgementMessageType = null; + if (null == acknowledgementMessageBytes) { + if (!endpoint.autoAck) { + exchange.setException(new MllpInvalidAcknowledgementException("Automatic Acknowledgement is disabled and the " + + MLLP_ACKNOWLEDGEMENT + " exchange property is null or cannot be converted to byte[]")); + return; + } + + String acknowledgmentTypeProperty = exchange.getProperty(MLLP_ACKNOWLEDGEMENT_TYPE, String.class); + try { + if (null == acknowledgmentTypeProperty) { + if (null == exchange.getException()) { + acknowledgementMessageType = "AA"; + acknowledgementMessageBytes = acknowledgementGenerator.generateApplicationAcceptAcknowledgementMessage(hl7MessageBytes); + } else { + acknowledgementMessageType = "AE"; + acknowledgementMessageBytes = acknowledgementGenerator.generateApplicationErrorAcknowledgementMessage(hl7MessageBytes); + } + } else { + switch (acknowledgmentTypeProperty) { + case "AA": + acknowledgementMessageType = "AA"; + acknowledgementMessageBytes = acknowledgementGenerator.generateApplicationAcceptAcknowledgementMessage(hl7MessageBytes); + break; + case "AE": + acknowledgementMessageType = "AE"; + acknowledgementMessageBytes = acknowledgementGenerator.generateApplicationErrorAcknowledgementMessage(hl7MessageBytes); + break; + case "AR": + acknowledgementMessageType = "AR"; + acknowledgementMessageBytes = acknowledgementGenerator.generateApplicationRejectAcknowledgementMessage(hl7MessageBytes); + break; + default: + exchange.setException(new Hl7AcknowledgementGenerationException("Unsupported acknowledgment type: " + acknowledgmentTypeProperty)); + return; + } + } + } catch (Hl7AcknowledgementGenerationException ackGenerationException) { + exchange.setException(ackGenerationException); + } + } else { + final byte bM = 77; + final byte bS = 83; + final byte bA = 65; + final byte bE = 69; + final byte bR = 82; + + final byte fieldSeparator = hl7MessageBytes[3]; + // Acknowledgment is specified in exchange property - determine the acknowledgement type + for (int i = 0; i < hl7MessageBytes.length; ++i) { + if (SEGMENT_DELIMITER == i) { + if (i + 7 < hl7MessageBytes.length // Make sure we don't run off the end of the message + && bM == hl7MessageBytes[i + 1] && bS == hl7MessageBytes[i + 2] && bA == hl7MessageBytes[i + 3] && fieldSeparator == hl7MessageBytes[i + 4]) { + if (fieldSeparator != hl7MessageBytes[i + 7]) { + log.warn("MSA-1 is longer than 2-bytes - ignoring trailing bytes"); + } + // Found MSA - pull acknowledgement bytes + byte[] acknowledgmentTypeBytes = new byte[2]; + acknowledgmentTypeBytes[0] = hl7MessageBytes[i + 5]; + acknowledgmentTypeBytes[1] = hl7MessageBytes[i + 6]; + acknowledgementMessageType = IOConverter.toString(acknowledgmentTypeBytes, exchange); + + // Verify it's a valid acknowledgement code + if (bA != acknowledgmentTypeBytes[0]) { + switch (acknowledgementMessageBytes[1]) { + case bA: + case bR: + case bE: + break; + default: + log.warn("Invalid acknowledgement type [" + acknowledgementMessageType + "] found in message - should be AA, AE or AR"); + } + } + + // if the MLLP_ACKNOWLEDGEMENT_TYPE property is set on the exchange, make sure it matches + String acknowledgementTypeProperty = exchange.getProperty(MLLP_ACKNOWLEDGEMENT_TYPE, String.class); + if (null != acknowledgementTypeProperty && !acknowledgementTypeProperty.equals(acknowledgementMessageType)) { + log.warn("Acknowledgement type found in message [" + acknowledgementMessageType + "] does not match " + + MLLP_ACKNOWLEDGEMENT_TYPE + " exchange property value [" + acknowledgementTypeProperty + "] - using value found in message"); + } + } + } + } + } + + // Send the acknowledgement + log.debug("Writing Acknowledgement"); + MllpUtil.writeFramedPayload(clientSocket, acknowledgementMessageBytes); + exchange.getIn().setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementMessageBytes); + exchange.getIn().setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, acknowledgementMessageType); + + // Check AFTER_SEND Properties + if (exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class)) { + MllpUtil.resetConnection(clientSocket); + return; + } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, boolean.class)) { + MllpUtil.closeConnection(clientSocket); + } + + } catch (Exception e) { + exchange.setException(e); + } + + } + + log.info("ClientSocketThread exiting"); + + } + + private void populateHl7DataHeaders(Exchange exchange, Message message, byte[] hl7MessageBytes) { + // Find the end of the MSH and indexes of the fields in the MSH to populate message headers + final byte fieldSeparator = hl7MessageBytes[3]; + final byte componentSeparator = hl7MessageBytes[4]; + int endOfMSH = -1; + List fieldSeparatorIndexes = new ArrayList<>(10); // We need at least 10 fields to create the acknowledgment + + for (int i = 0; i < hl7MessageBytes.length; ++i) { + if (fieldSeparator == hl7MessageBytes[i]) { + fieldSeparatorIndexes.add(i); + } else if (SEGMENT_DELIMITER == hl7MessageBytes[i]) { + endOfMSH = i; + break; + } + } + + if (-1 == endOfMSH) { + // TODO: May want to throw some sort of an Exception here + log.error("Population of message headers failed - unable to find the end of the MSH segment"); + } else { + log.debug("Populating the message headers"); + Charset charset = Charset.forName(IOHelper.getCharsetName(exchange)); + + // MSH-3 + message.setHeader(MLLP_SENDING_APPLICATION, new String(hl7MessageBytes, fieldSeparatorIndexes.get(1) + 1, + fieldSeparatorIndexes.get(2) - fieldSeparatorIndexes.get(1) - 1, charset)); + // MSH-4 + message.setHeader(MLLP_SENDING_FACILITY, new String(hl7MessageBytes, fieldSeparatorIndexes.get(2) + 1, + fieldSeparatorIndexes.get(3) - fieldSeparatorIndexes.get(2) - 1, charset)); + // MSH-5 + message.setHeader(MLLP_RECEIVING_APPLICATION, new String(hl7MessageBytes, fieldSeparatorIndexes.get(3) + 1, + fieldSeparatorIndexes.get(4) - fieldSeparatorIndexes.get(3) - 1, + charset)); + // MSH-6 + message.setHeader(MLLP_RECEIVING_FACILITY, new String(hl7MessageBytes, fieldSeparatorIndexes.get(4) + 1, + fieldSeparatorIndexes.get(5) - fieldSeparatorIndexes.get(4) - 1, + charset)); + // MSH-7 + message.setHeader(MLLP_TIMESTAMP, new String(hl7MessageBytes, fieldSeparatorIndexes.get(5) + 1, + fieldSeparatorIndexes.get(6) - fieldSeparatorIndexes.get(5) - 1, charset)); + // MSH-8 + message.setHeader(MLLP_SECURITY, new String(hl7MessageBytes, fieldSeparatorIndexes.get(6) + 1, + fieldSeparatorIndexes.get(7) - fieldSeparatorIndexes.get(6) - 1, charset)); + // MSH-9 + message.setHeader(MLLP_MESSAGE_TYPE, new String(hl7MessageBytes, fieldSeparatorIndexes.get(7) + 1, + fieldSeparatorIndexes.get(8) - fieldSeparatorIndexes.get(7) - 1, charset)); + // MSH-10 + message.setHeader(MLLP_MESSAGE_CONTROL, new String(hl7MessageBytes, fieldSeparatorIndexes.get(8) + 1, + fieldSeparatorIndexes.get(9) - fieldSeparatorIndexes.get(8) - 1, charset)); + // MSH-11 + message.setHeader(MLLP_PROCESSING_ID, new String(hl7MessageBytes, fieldSeparatorIndexes.get(9) + 1, + fieldSeparatorIndexes.get(10) - fieldSeparatorIndexes.get(9) - 1, charset)); + // MSH-12 + message.setHeader(MLLP_VERSION_ID, new String(hl7MessageBytes, fieldSeparatorIndexes.get(10) + 1, + fieldSeparatorIndexes.get(11) - fieldSeparatorIndexes.get(10) - 1, charset)); + // MSH-18 + message.setHeader(MLLP_CHARSET, new String(hl7MessageBytes, fieldSeparatorIndexes.get(16) + 1, + fieldSeparatorIndexes.get(17) - fieldSeparatorIndexes.get(16) - 1, charset)); + + for (int i = fieldSeparatorIndexes.get(7) + 1; i < fieldSeparatorIndexes.get(8); ++i) { + if (componentSeparator == hl7MessageBytes[i]) { + // MSH-9.1 + message.setHeader(MLLP_EVENT_TYPE, new String(hl7MessageBytes, fieldSeparatorIndexes.get(7) + 1, + i - fieldSeparatorIndexes.get(7) - 1, charset)); + // MSH-9.2 + message.setHeader(MLLP_TRIGGER_EVENT, new String(hl7MessageBytes, i + 1, + fieldSeparatorIndexes.get(8) - i - 1, charset)); + break; + } + } + } + } + + + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTimeoutException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTimeoutException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTimeoutException.java new file mode 100644 index 0000000..0e54772 --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTimeoutException.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +/** + * Raised when a MLLP Producer or Consumer encounter a timeout reading a message + */ +public class MllpTimeoutException extends MllpException { + public MllpTimeoutException(String message) { + super(message); + } + + public MllpTimeoutException(String message, byte[] mllpPayload) { + super(message, mllpPayload); + } + + public MllpTimeoutException(String message, Throwable cause) { + super(message, cause); + } + + public MllpTimeoutException(String message, byte[] mllpPayload, Throwable cause) { + super(message, mllpPayload, cause); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpWriteException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpWriteException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpWriteException.java new file mode 100644 index 0000000..43fe740 --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpWriteException.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +/** + * Raised when a MLLP Producer or consumer encounter an error transmitting data + */ +public class MllpWriteException extends MllpException { + public MllpWriteException(String message) { + super(message); + } + + public MllpWriteException(String message, byte[] mllpPayload) { + super(message, mllpPayload); + } + + public MllpWriteException(String message, Throwable cause) { + super(message, cause); + } + + public MllpWriteException(String message, byte[] mllpPayload, Throwable cause) { + super(message, mllpPayload, cause); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java new file mode 100644 index 0000000..4fd3fbf --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java @@ -0,0 +1,371 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp.impl; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketTimeoutException; + +import org.apache.camel.component.mllp.MllpComponent; +import org.apache.camel.component.mllp.MllpCorruptFrameException; +import org.apache.camel.component.mllp.MllpException; +import org.apache.camel.component.mllp.MllpTimeoutException; +import org.apache.camel.component.mllp.MllpWriteException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_BLOCK; +import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_DATA; +import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_STREAM; +import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK; + + +/** + * Supplies methods to read and write messages in a MLLP Frame. + *

+ * Although the methods in the class are intended to handle HL7 v2 formatted messages, the methods do not + * depend on that format - any byte[]can be written to the Socket. Also, any byte[] can be read from the socket + * provided it has the proper MLLP Enveloping - payload>. + *

+ * NOTE: MLLP payloads are not logged unless the logging level is set to DEBUG or TRACE to avoid introducing PHI + * into the log files. Logging of PHI can be globally disabled by setting the org.apache.camel.mllp.logPHI system + * property. The property is evaluated using Boolean.parseBoolean. + *

+ */ +public final class MllpUtil { + static Logger log = LoggerFactory.getLogger(MllpUtil.class); + + private MllpUtil() { + } + + /** + * Open the MLLP frame by reading from the Socket until the begging of the frame is found. + *

+ * If any errors occur (including MLLP frame errors) while opening the frame, the socket will be closed and an + * Exception will be thrown. + * + * @param socket the Socket to read + * @throws SocketTimeoutException thrown if a timeout occurs while looking for the beginning of the MLLP frame, but + * nothing is yet available - this is NOT an error condition + * @throws MllpCorruptFrameException if the MLLP Frame is corrupted in some way + * @throws MllpException for other unexpected error conditions + */ + public static void openFrame(Socket socket) throws SocketTimeoutException, MllpCorruptFrameException, MllpException { + if (socket.isConnected() && !socket.isClosed()) { + InputStream socketInputStream = MllpUtil.getInputStream(socket); + + int readByte; + try { + readByte = socketInputStream.read(); + switch (readByte) { + case START_OF_BLOCK: + return; + case END_OF_STREAM: + resetConnection(socket); + return; + default: + // Continue on and process the out-of-frame data + } + } catch (SocketTimeoutException normaTimeoutEx) { + // Just pass this on - the caller will wrap it in a MllpTimeoutException + throw normaTimeoutEx; + } catch (IOException unexpectedException) { + log.error("Unexpected Exception occurred opening MLLP frame - resetting the connection"); + MllpUtil.resetConnection(socket); + throw new MllpException("Unexpected Exception occurred opening MLLP frame", unexpectedException); + } + + /* + From here on, we're in a bad frame state. Read what's left in the socket, close the connection and + return the out-of-frame data. + */ + ByteArrayOutputStream outOfFrameData = new ByteArrayOutputStream(); + outOfFrameData.write(readByte); + + try { + while (true) { + readByte = socketInputStream.read(); + switch (readByte) { + case END_OF_STREAM: + if (isLogPHIEnabled(log)) { + log.error("END_OF_STREAM read while looking for the beginning of the MLLP frame, and " + + "out-of-frame data had been read - resetting connection and eating out-of-frame data: {}", + outOfFrameData.toString().replace('\r', '\n')); + } else { + log.error("END_OF_STREAM read while looking for the beginning of the MLLP frame, and out-of-frame data had been read - resetting connection and eating out-of-frame data"); + } + resetConnection(socket); + + throw new MllpCorruptFrameException("END_OF_STREAM read while looking for the beginning of the MLLP frame", outOfFrameData.toByteArray()); + case START_OF_BLOCK: + if (isLogPHIEnabled(log)) { + log.warn("The beginning of the MLLP frame was preceded by out-of-frame data - eating data: {}", outOfFrameData.toString().replace('\r', '\n')); + } else { + log.warn("The beginning of the MLLP frame was preceded by out-of-frame data - eating data"); + } + + throw new MllpCorruptFrameException("The beginning of the MLLP frame was preceded by out-of-frame data", outOfFrameData.toByteArray()); + default: + // still reading out-of-frame data + outOfFrameData.write(readByte); + break; + } + } + } catch (SocketTimeoutException timeoutEx) { + if (isLogPHIEnabled(log)) { + log.error("Timeout looking for the beginning of the MLLP frame, and out-of-frame data had been read - resetting connection and eating out-of-frame data: {}", + outOfFrameData.toString().replace('\r', '\n')); + } else { + log.error("Timeout looking for the beginning of the MLLP frame, and out-of-frame data had been read - resetting connection and eating out-of-frame data"); + } + + resetConnection(socket); + + throw new MllpCorruptFrameException("Timeout looking for the beginning of the MLLP frame, and out-of-frame data had been read", outOfFrameData.toByteArray()); + } catch (IOException e) { + if (isLogPHIEnabled(log)) { + log.error("Exception encountered looking for the beginning of the MLLP frame, and out-of-frame data had been read - resetting connection and eating out-of-frame data: {}", + outOfFrameData.toString().replace('\r', '\n')); + } else { + log.error("Exception encountered looking for the beginning of the MLLP frame, and out-of-frame data had been read - resetting connection and eating out-of-frame data"); + } + + resetConnection(socket); + + throw new MllpCorruptFrameException("Exception encountered looking for the beginning of the MLLP frame, and out-of-frame data had been read", outOfFrameData.toByteArray()); + } + } + } + + /** + * Close a MLLP frame by reading from the socket until the end of the frame is found. + *

+ * The method assumes the MLLP frame has already been opened and the first byte available + * will be the first byte of the framed message. + *

+ * The method consumes the END_OF_BLOCK and END_OF_DATA bytes from the stream before returning the payload + *

+ * If any errors occur (including MLLP frame errors) while opening the frame, the socket will be closed and an + * Exception will be thrown. + * + * @param socket the Socket to be read + * @return the payload of the MLLP-Enveloped message as a byte[] + * @throws MllpTimeoutException thrown if a timeout occurs while closing the MLLP frame + * @throws MllpCorruptFrameException if the MLLP Frame is corrupted in some way + * @throws MllpException for other unexpected error conditions + */ + public static byte[] closeFrame(Socket socket) throws MllpTimeoutException, MllpCorruptFrameException, MllpException { + if (socket.isConnected() && !socket.isClosed()) { + InputStream socketInputStream = MllpUtil.getInputStream(socket); + // TODO: Come up with an intelligent way to size this stream + ByteArrayOutputStream payload = new ByteArrayOutputStream(4096); + try { + while (true) { + int readByte = socketInputStream.read(); + switch (readByte) { + case END_OF_STREAM: + if (isLogPHIEnabled(log)) { + log.error("END_OF_STREAM read while looking for the end of the MLLP frame - resetting connection and eating data: {}", payload.toString().replace('\r', '\n')); + } else { + log.error("END_OF_STREAM read while looking for the end of the MLLP frame - resetting connection and eating data"); + } + + resetConnection(socket); + + throw new MllpCorruptFrameException("END_OF_STREAM read while looking for the end of the MLLP frame", payload.size() > 0 ? payload.toByteArray() : null); + case START_OF_BLOCK: + if (isLogPHIEnabled(log)) { + log.error("A new MLLP frame was opened before the previous frame was closed - resetting connection and eating data: {}", payload.toString().replace('\r', '\n')); + } else { + log.error("A new MLLP frame was opened before the previous frame was closed - resetting connection and eating data"); + } + + resetConnection(socket); + + throw new MllpCorruptFrameException("A new MLLP frame was opened before the previous frame was closed", payload.size() > 0 ? payload.toByteArray() : null); + case END_OF_BLOCK: + if (END_OF_DATA != socketInputStream.read()) { + if (isLogPHIEnabled(log)) { + log.error("The MLLP frame was partially closed - END_OF_BLOCK was not followed by END_OF_DATA - resetting connection and eating data: {}", + payload.toString().replace('\r', '\n')); + } else { + log.error("The MLLP frame was partially closed - END_OF_BLOCK was not followed by END_OF_DATA - resetting connection and eating data"); + } + + resetConnection(socket); + + throw new MllpCorruptFrameException("The MLLP frame was partially closed - END_OF_BLOCK was not followed by END_OF_DATA", + payload.size() > 0 ? payload.toByteArray() : null); + } + return payload.toByteArray(); + default: + // log.trace( "Read Character: {}", (char)readByte ); + payload.write(readByte); + } + } + } catch (SocketTimeoutException timeoutEx) { + if (0 < payload.size()) { + if (isLogPHIEnabled(log)) { + log.error("Timeout looking for the end of the MLLP frame - resetting connection and eating data: {}", payload.toString().replace('\r', '\n')); + } else { + log.error("Timeout looking for the end of the MLLP frame - resetting connection and eating data"); + } + } else { + log.error("Timeout looking for the end of the MLLP frame - resetting connection"); + } + + resetConnection(socket); + + throw new MllpCorruptFrameException("Timeout looking for the end of the MLLP frame", payload.size() > 0 ? payload.toByteArray() : null, timeoutEx); + } catch (IOException ioEx) { + if (0 < payload.size()) { + if (isLogPHIEnabled(log)) { + log.error("Exception encountered looking for the end of the MLLP frame - resetting connection and eating data: {}", payload.toString().replace('\r', '\n')); + } else { + log.error("Exception encountered looking for the end of the MLLP frame - resetting connection and eating data"); + } + } else { + log.error("Exception encountered looking for the end of the MLLP frame - resetting connection"); + } + + resetConnection(socket); + + throw new MllpException("Exception encountered looking for the end of the MLLP frame", payload.size() > 0 ? payload.toByteArray() : null, ioEx); + } + } + + return null; + } + + /** + * Write a MLLP-Framed payload to the Socket + * + * @param socket the Socket to write the payload + * @param payload the MLLP payload + * @return true if write was successful; false otherwise + * @throws MllpWriteException if the write fails + * @throws MllpException for other unexpected error conditions + */ + public static void writeFramedPayload(Socket socket, byte[] payload) throws MllpException { + if (socket.isConnected() && !socket.isClosed()) { + OutputStream outputStream = null; + try { + outputStream = new BufferedOutputStream(socket.getOutputStream(), payload.length + 4); + } catch (IOException ioEx) { + log.error("Error Retrieving OutputStream from Socket - resetting connection"); + + resetConnection(socket); + + throw new MllpException("Error Retrieving OutputStream from Socket", ioEx); + } + + if (null != outputStream) { + try { + outputStream.write(START_OF_BLOCK); + outputStream.write(payload, 0, payload.length); + outputStream.write(END_OF_BLOCK); + outputStream.write(END_OF_DATA); + outputStream.flush(); + } catch (IOException ioEx) { + log.error("Error writing MLLP payload - resetting connection"); + + resetConnection(socket); + + throw new MllpWriteException("Error writing MLLP payload", payload, ioEx); + } + } + } + } + + public static void closeConnection(Socket socket) { + if (null != socket) { + if (!socket.isClosed()) { + try { + socket.shutdownInput(); + } catch (Exception ex) { + log.warn("Exception encountered shutting down the input stream on the client socket", ex); + } + + try { + socket.shutdownOutput(); + } catch (Exception ex) { + log.warn("Exception encountered shutting down the output stream on the client socket", ex); + } + + try { + socket.close(); + } catch (Exception ex) { + log.warn("Exception encountered closing the client socket", ex); + } + } + } + } + + public static void resetConnection(Socket socket) { + if (null != socket) { + try { + socket.setSoLinger(true, 0); + } catch (Exception ex) { + log.warn("Exception encountered setting SO_LINGER to 0 on the socket to force a reset", ex); + } + + try { + socket.close(); + } catch (Exception ex) { + log.warn("Exception encountered closing the client socket", ex); + } + + } + + } + + /** + * Retrieve the InputStream from the Socket + *

+ * Private utility method that catches IOExceptions when retrieving the InputStream + * + * @param socket Socket to get the InputStream from + * @return the InputStream for the Socket + * @throws MllpException when unexpected conditions occur + */ + private static InputStream getInputStream(Socket socket) throws MllpException { + InputStream socketInputStream = null; + try { + socketInputStream = socket.getInputStream(); + } catch (IOException ioEx) { + throw new MllpException("Error Retrieving InputStream from Socket", ioEx); + } + + return socketInputStream; + } + + private static boolean isLogPHIEnabled(Logger targetLogger) { + String logPHIProperty = System.getProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "true"); + if (targetLogger.isDebugEnabled()) { + if (Boolean.parseBoolean(System.getProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "true"))) { + return true; + } + } + + return false; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerationException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerationException.java b/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerationException.java new file mode 100644 index 0000000..c27c7f9 --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerationException.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.mllp; + +public class Hl7AcknowledgementGenerationException extends Exception { + private final byte[] hl7Message; + + public Hl7AcknowledgementGenerationException(String message) { + super(message); + this.hl7Message = null; + } + + public Hl7AcknowledgementGenerationException(String message, byte[] hl7Message) { + super(message); + this.hl7Message = hl7Message; + } + + public Hl7AcknowledgementGenerationException(String message, byte[] hl7Message, Throwable cause) { + super(message, cause); + this.hl7Message = hl7Message; + } + + + public byte[] getHl7Message() { + return hl7Message; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerator.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerator.java b/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerator.java new file mode 100644 index 0000000..b95264a --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerator.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.mllp; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT; +import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE; +import static org.apache.camel.component.mllp.MllpEndpoint.MESSAGE_TERMINATOR; +import static org.apache.camel.component.mllp.MllpEndpoint.SEGMENT_DELIMITER; + +/** + * A Camel Processor for generating HL7 Acknowledgements + */ +public class Hl7AcknowledgementGenerator implements Processor { + Logger log = LoggerFactory.getLogger(this.getClass()); + + String defaultNack = "MSH|^~\\&|||||||NACK||P|2.2" + SEGMENT_DELIMITER + + "MSA|AR|" + SEGMENT_DELIMITER + + MESSAGE_TERMINATOR; + + + @Override + public void process(Exchange exchange) throws Exception { + Message message = null; + if (exchange.hasOut()) { + message = exchange.getOut(); + } else { + message = exchange.getIn(); + } + + byte[] hl7Bytes = message.getMandatoryBody(byte[].class); + + byte[] acknowledgementBytes = null; + if (null == exchange.getException()) { + acknowledgementBytes = generateApplicationAcceptAcknowledgementMessage(hl7Bytes); + message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AA"); + } else { + acknowledgementBytes = generateApplicationErrorAcknowledgementMessage(hl7Bytes); + message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AE"); + } + + message.setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementBytes); + } + + public byte[] generateApplicationAcceptAcknowledgementMessage(byte[] hl7MessageBytes) throws Hl7AcknowledgementGenerationException { + return generateAcknowledgementMessage(hl7MessageBytes, "AA"); + } + + public byte[] generateApplicationRejectAcknowledgementMessage(byte[] hl7MessageBytes) throws Hl7AcknowledgementGenerationException { + return generateAcknowledgementMessage(hl7MessageBytes, "AR"); + } + + public byte[] generateApplicationErrorAcknowledgementMessage(byte[] hl7MessageBytes) throws Hl7AcknowledgementGenerationException { + return generateAcknowledgementMessage(hl7MessageBytes, "AE"); + } + + byte[] generateAcknowledgementMessage(byte[] hl7MessageBytes, String acknowledgementCode) throws Hl7AcknowledgementGenerationException { + if (hl7MessageBytes == null) { + throw new Hl7AcknowledgementGenerationException("Null HL7 message received for parsing operation"); + } + + final byte fieldSeparator = hl7MessageBytes[3]; + final byte componentSeparator = hl7MessageBytes[4]; + + List fieldSeparatorIndexes = new ArrayList<>(10); // We need at least 10 fields to create the acknowledgment + + // Find the end of the MSH and indexes of the fields in the MSH + int endOfMSH = -1; + for (int i = 0; i < hl7MessageBytes.length; ++i) { + if (fieldSeparator == hl7MessageBytes[i]) { + fieldSeparatorIndexes.add(i); + } else if (SEGMENT_DELIMITER == hl7MessageBytes[i]) { + endOfMSH = i; + break; + } + } + + if (-1 == endOfMSH) { + throw new Hl7AcknowledgementGenerationException("Failed to find the end of the MSH Segment while attempting to generate response", hl7MessageBytes); + } + + if (8 > fieldSeparatorIndexes.size()) { + throw new Hl7AcknowledgementGenerationException("Insufficient number of fields in after MSH-2 in MSH to generate a response - 8 are required but " + + fieldSeparatorIndexes.size() + " " + "were found", hl7MessageBytes); + } + + // Build the MSH Segment + ByteArrayOutputStream acknowledgement = new ByteArrayOutputStream(1024); + acknowledgement.write(hl7MessageBytes, 0, 8); // through MSH-2 (without trailing field separator) + acknowledgement.write(hl7MessageBytes, fieldSeparatorIndexes.get(3), fieldSeparatorIndexes.get(4) - fieldSeparatorIndexes.get(3)); // MSH-5 + acknowledgement.write(hl7MessageBytes, fieldSeparatorIndexes.get(4), fieldSeparatorIndexes.get(5) - fieldSeparatorIndexes.get(4)); // MSH-6 + acknowledgement.write(hl7MessageBytes, fieldSeparatorIndexes.get(1), fieldSeparatorIndexes.get(2) - fieldSeparatorIndexes.get(1)); // MSH-3 + acknowledgement.write(hl7MessageBytes, fieldSeparatorIndexes.get(2), fieldSeparatorIndexes.get(3) - fieldSeparatorIndexes.get(2)); // MSH-4 + acknowledgement.write(hl7MessageBytes, fieldSeparatorIndexes.get(5), fieldSeparatorIndexes.get(7) - fieldSeparatorIndexes.get(5)); // MSH-7 and MSH-8 + // Need to generate the correct MSH-9 + acknowledgement.write(fieldSeparator); + acknowledgement.write("ACK".getBytes(), 0, 3); // MSH-9.1 + int msh92start = -1; + for (int j = fieldSeparatorIndexes.get(7) + 1; j < fieldSeparatorIndexes.get(8); ++j) { + if (componentSeparator == hl7MessageBytes[j]) { + msh92start = j; + break; + } + } + + if (-1 == msh92start) { + log.warn("Didn't find component separator for MSH-9.2 - sending ACK in MSH-9"); + } else { + acknowledgement.write(hl7MessageBytes, msh92start, fieldSeparatorIndexes.get(8) - msh92start); // MSH-9.2 + } + + acknowledgement.write(hl7MessageBytes, fieldSeparatorIndexes.get(8), endOfMSH - fieldSeparatorIndexes.get(8)); // MSH-10 through the end of the MSH + acknowledgement.write(SEGMENT_DELIMITER); + + // Build the MSA Segment + acknowledgement.write("MSA".getBytes(), 0, 3); + acknowledgement.write(fieldSeparator); + acknowledgement.write(acknowledgementCode.getBytes(), 0, 2); + acknowledgement.write(hl7MessageBytes, fieldSeparatorIndexes.get(8), fieldSeparatorIndexes.get(9) - fieldSeparatorIndexes.get(8)); // MSH-10 end + acknowledgement.write(SEGMENT_DELIMITER); + + // Terminate the message + acknowledgement.write(MESSAGE_TERMINATOR); + + return acknowledgement.toByteArray(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/resources/META-INF/services/org/apache/camel/component/mllp ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/resources/META-INF/services/org/apache/camel/component/mllp b/components/camel-mllp/src/main/resources/META-INF/services/org/apache/camel/component/mllp new file mode 100644 index 0000000..3aedcac --- /dev/null +++ b/components/camel-mllp/src/main/resources/META-INF/services/org/apache/camel/component/mllp @@ -0,0 +1 @@ +class=org.apache.camel.component.mllp.MllpComponent http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpProducerConsumerLoopbackTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpProducerConsumerLoopbackTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpProducerConsumerLoopbackTest.java new file mode 100644 index 0000000..027de7c --- /dev/null +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpProducerConsumerLoopbackTest.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.EndpointInject; +import org.apache.camel.LoggingLevel; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.camel.test.mllp.PassthroughProcessor; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.camel.test.mllp.Hl7MessageGenerator.generateMessage; + + +public class MllpProducerConsumerLoopbackTest extends CamelTestSupport { + int mllpPort = AvailablePortFinder.getNextAvailable(); + + @EndpointInject(uri = "direct://source") + ProducerTemplate source; + + @EndpointInject(uri = "mock://acknowledged") + MockEndpoint acknowledged; + + @Override + protected CamelContext createCamelContext() throws Exception { + DefaultCamelContext context = (DefaultCamelContext) super.createCamelContext(); + + context.setUseMDCLogging(true); + context.setName(this.getClass().getSimpleName()); + + return context; + } + + + @Override + protected RouteBuilder[] createRouteBuilders() throws Exception { + RouteBuilder[] builders = new RouteBuilder[2]; + final int groupInterval = 1000; + final boolean groupActiveOnly = false; + + builders[0] = new RouteBuilder() { + String routeId = "mllp-receiver"; + + public void configure() { + fromF("mllp:%d?autoAck=true", mllpPort) + .convertBodyTo(String.class) + .to(acknowledged) + .process(new PassthroughProcessor("after send to result")) + .log(LoggingLevel.DEBUG, routeId, "Receiving: ${body}") + .toF("log://%s?level=INFO&groupInterval=%d&groupActiveOnly=%b", routeId, groupInterval, groupActiveOnly); + } + }; + + builders[1] = new RouteBuilder() { + String routeId = "mllp-sender"; + + String host = "0.0.0.0"; + + public void configure() { + from(source.getDefaultEndpoint()).routeId(routeId) + .log(LoggingLevel.DEBUG, routeId, "Sending: ${body}") + .toF("mllp://%s:%d", host, mllpPort) + .setBody(header(MllpConstants.MLLP_ACKNOWLEDGEMENT)) + .toF("log://%s?level=INFO&groupInterval=%d&groupActiveOnly=%b", routeId, groupInterval, groupActiveOnly); + } + }; + + return builders; + } + + @Test + public void testLoopbackWithOneMessage() throws Exception { + String testMessage = generateMessage(); + acknowledged.expectedBodiesReceived(testMessage); + + String acknowledgement = source.requestBody((Object) testMessage, String.class); + Assert.assertThat("Should be acknowledgment for message 1", acknowledgement, CoreMatchers.containsString(String.format("MSA|AA|00001"))); + + assertMockEndpointsSatisfied(60, TimeUnit.SECONDS); + } + + @Test + public void testLoopbackMultipleMessages() throws Exception { + int messageCount = 1000; + acknowledged.expectedMessageCount(messageCount); + + for (int i = 1; i <= messageCount; ++i) { + String testMessage = generateMessage(i); + acknowledged.message(i - 1).body().isEqualTo(testMessage); + String acknowledgement = source.requestBody((Object) testMessage, String.class); + Assert.assertThat("Should be acknowledgment for message " + i, acknowledgement, CoreMatchers.containsString(String.format("MSA|AA|%05d", i))); + + } + + assertMockEndpointsSatisfied(60, TimeUnit.SECONDS); + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientConsumerBlueprintTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientConsumerBlueprintTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientConsumerBlueprintTest.java new file mode 100644 index 0000000..e4ca285 --- /dev/null +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientConsumerBlueprintTest.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +import org.apache.camel.EndpointInject; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.blueprint.CamelBlueprintTestSupport; +import org.junit.Ignore; + +@Ignore(value = "Not Yet Implemented") +// TODO: Implement this +public class MllpTcpClientConsumerBlueprintTest extends CamelBlueprintTestSupport { + @EndpointInject(uri = "mock://target") + MockEndpoint target; + + @Override + protected String getBlueprintDescriptor() { + return "OSGI-INF/blueprint/mllp-tcp-client-consumer.xml"; + } + + +} http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerAcknowledgementTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerAcknowledgementTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerAcknowledgementTest.java new file mode 100644 index 0000000..af41550 --- /dev/null +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerAcknowledgementTest.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.EndpointInject; +import org.apache.camel.LoggingLevel; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.test.junit.rule.mllp.MllpServerResource; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Rule; +import org.junit.Test; + +import static org.apache.camel.test.mllp.Hl7MessageGenerator.generateMessage; + + +public class MllpTcpClientProducerAcknowledgementTest extends CamelTestSupport { + @Rule + public MllpServerResource mllpServer = new MllpServerResource(AvailablePortFinder.getNextAvailable()); + + @EndpointInject(uri = "direct://source") + ProducerTemplate source; + + @EndpointInject(uri = "mock://complete") + MockEndpoint complete; + + @EndpointInject(uri = "mock://aa-ack") + MockEndpoint accept; + @EndpointInject(uri = "mock://ae-nack") + MockEndpoint error; + @EndpointInject(uri = "mock://ar-nack") + MockEndpoint reject; + + @Override + protected CamelContext createCamelContext() throws Exception { + DefaultCamelContext context = (DefaultCamelContext) super.createCamelContext(); + + context.setUseMDCLogging(true); + context.setName(this.getClass().getSimpleName()); + + return context; + } + + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + String routeId = "mllp-sender"; + + String host = "0.0.0.0"; + int port = mllpServer.getListenPort(); + + public void configure() { + onException(MllpApplicationRejectAcknowledgementException.class) + .handled(true) + .to(reject) + .log(LoggingLevel.ERROR, routeId, "AR Acknowledgemnet"); + + onException(MllpApplicationErrorAcknowledgementException.class) + .handled(true) + .to(error) + .log(LoggingLevel.ERROR, routeId, "AE Acknowledgement"); + + onCompletion() + .onCompleteOnly() + .to(complete) + .log(LoggingLevel.DEBUG, routeId, "AA Acknowledgement"); + + from(source.getDefaultEndpoint()).routeId(routeId) + .log(LoggingLevel.INFO, routeId, "Sending Message") + .toF("mllp://%s:%d", host, port) + .log(LoggingLevel.INFO, routeId, "Received Acknowledgement") + .to(accept); + } + }; + } + + @Test + public void testApplicationAcceptAcknowledgement() throws Exception { + complete.setExpectedMessageCount(1); + accept.setExpectedMessageCount(1); + reject.setExpectedMessageCount(0); + error.setExpectedMessageCount(0); + + source.sendBody(generateMessage()); + + assertMockEndpointsSatisfied(15, TimeUnit.SECONDS); + } + + @Test + public void testApplicationRejectAcknowledgement() throws Exception { + complete.setExpectedMessageCount(1); + accept.setExpectedMessageCount(0); + reject.setExpectedMessageCount(1); + error.setExpectedMessageCount(0); + + mllpServer.setSendApplicationRejectAcknowledgementModulus(1); + + source.sendBody(generateMessage()); + + assertMockEndpointsSatisfied(15, TimeUnit.SECONDS); + } + + @Test + public void testApplicationErrorAcknowledgement() throws Exception { + complete.setExpectedMessageCount(1); + accept.setExpectedMessageCount(0); + reject.setExpectedMessageCount(0); + error.setExpectedMessageCount(1); + + mllpServer.setSendApplicationErrorAcknowledgementModulus(1); + + source.sendBody(generateMessage()); + + assertMockEndpointsSatisfied(15, TimeUnit.SECONDS); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java new file mode 100644 index 0000000..4233d80 --- /dev/null +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +import java.util.Dictionary; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.EndpointInject; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultComponentResolver; +import org.apache.camel.spi.ComponentResolver; +import org.apache.camel.test.blueprint.CamelBlueprintTestSupport; +import org.apache.camel.test.junit.rule.mllp.MllpServerResource; +import org.apache.camel.util.KeyValueHolder; +import org.junit.Rule; +import org.junit.Test; + +import static org.apache.camel.test.mllp.Hl7MessageGenerator.generateMessage; + +public class MllpTcpClientProducerBlueprintTest extends CamelBlueprintTestSupport { + @Rule + public MllpServerResource mllpServer = new MllpServerResource(); + + final String sourceUri = "direct://source"; + final String mockAcknowledgedUri = "mock://acknowledged"; + final String mockTimeoutUri = "mock://timeout-ex"; + final String mockAeExUri = "mock://ae-ack"; + final String mockArExUri = "mock://ar-ack"; + final String mockFrameExUri = "mock://frame-ex"; + + @EndpointInject(uri = sourceUri) + ProducerTemplate source; + @EndpointInject(uri = mockAcknowledgedUri) + MockEndpoint acknowledged; + @EndpointInject(uri = mockTimeoutUri) + MockEndpoint timeout; + @EndpointInject(uri = mockAeExUri) + MockEndpoint ae; + @EndpointInject(uri = mockArExUri) + MockEndpoint ar; + @EndpointInject(uri = mockFrameExUri) + MockEndpoint frame; + + @Override + protected String getBlueprintDescriptor() { + return "OSGI-INF/blueprint/mllp-tcp-client-producer-test.xml"; + } + + @Override + protected Properties useOverridePropertiesWithPropertiesComponent() { + Properties props = new Properties(); + + props.setProperty("sourceUri", sourceUri); + props.setProperty("acknowledgedUri", mockAcknowledgedUri); + props.setProperty("timeoutUri", mockTimeoutUri); + props.setProperty("frameErrorUri", mockFrameExUri); + props.setProperty("errorAcknowledgementUri", mockAeExUri); + props.setProperty("rejectAcknowledgementUri", mockArExUri); + + props.setProperty("mllp.port", Integer.toString(mllpServer.getListenPort())); + + return props; + } + + /* + This doesn't seem to work + @Override + protected String useOverridePropertiesWithConfigAdmin(Dictionary props) throws Exception { + + props.put("mllp.port", mllpServer.getListenPort() ); + + return "MllpTcpClientProducer"; + } + */ + + @Override + protected void addServicesOnStartup(Map> services) { + ComponentResolver testResolver = new DefaultComponentResolver(); + + services.put(ComponentResolver.class.getName(), asService(testResolver, "component", "mllp")); + } + + @Test() + public void testSendMultipleMessages() throws Exception { + int messageCount = 500; + acknowledged.setExpectedMessageCount(messageCount); + timeout.setExpectedMessageCount(0); + frame.setExpectedMessageCount(0); + ae.setExpectedMessageCount(0); + ar.setExpectedMessageCount(0); + + // Uncomment one of these lines to see the NACKs handled + // mllpServer.setSendApplicationRejectAcknowledgementModulus(10); + // mllpServer.setSendApplicationErrorAcknowledgementModulus(10); + + for (int i = 0; i < messageCount; ++i) { + log.debug("Triggering message {}", i); + Object response = source.requestBodyAndHeader(generateMessage(i), "CamelMllpMessageControlId", String.format("%05d", i)); + log.debug("response {}\n{}", i, response); + } + + assertMockEndpointsSatisfied(15, TimeUnit.SECONDS); + } + + +}