From commits-return-10422-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Mon Mar 16 17:25:57 2009 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 40487 invoked from network); 16 Mar 2009 17:25:57 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 16 Mar 2009 17:25:57 -0000 Received: (qmail 56758 invoked by uid 500); 16 Mar 2009 17:25:57 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 56703 invoked by uid 500); 16 Mar 2009 17:25:57 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 56690 invoked by uid 99); 16 Mar 2009 17:25:57 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Mar 2009 10:25:57 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Mar 2009 17:25:46 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 5207D2388886; Mon, 16 Mar 2009 17:25:25 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r754964 [1/2] - in /activemq/sandbox/activemq-flow: ./ src/main/java/org/apache/activemq/ src/main/java/org/apache/activemq/broker/ src/main/java/org/apache/activemq/broker/openwire/ src/main/java/org/apache/activemq/broker/stomp/ src/main/... Date: Mon, 16 Mar 2009 17:25:24 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090316172525.5207D2388886@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Mon Mar 16 17:25:23 2009 New Revision: 754964 URL: http://svn.apache.org/viewvc?rev=754964&view=rev Log: Added in initial bits needed to support the STOMP protocol better. Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompBrokerTest.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java Removed: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java Modified: activemq/sandbox/activemq-flow/pom.xml activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java Modified: activemq/sandbox/activemq-flow/pom.xml URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/pom.xml?rev=754964&r1=754963&r2=754964&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/pom.xml (original) +++ activemq/sandbox/activemq-flow/pom.xml Mon Mar 16 17:25:23 2009 @@ -37,6 +37,11 @@ org.apache.activemq activemq-core + + com.thoughtworks.xstream + xstream + true + junit Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java?rev=754964&r1=754963&r2=754964&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java Mon Mar 16 17:25:23 2009 @@ -16,6 +16,7 @@ */ package org.apache.activemq; +import java.beans.ExceptionListener; import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -41,7 +42,9 @@ private IDispatcher dispatcher; private final AtomicBoolean stopping = new AtomicBoolean(); private ExecutorService blockingWriter; - + private ExceptionListener exceptionListener; + + public void setTransport(Transport transport) { this.transport = transport; } @@ -99,13 +102,16 @@ } } - public void onException(IOException error) { + final public void onException(IOException error) { if (!isStopping()) { onException((Exception) error); } } - public void onException(Exception error) { + final public void onException(Exception error) { + if( exceptionListener!=null ) { + exceptionListener.exceptionThrown(error); + } } public boolean isStopping(){ @@ -158,4 +164,12 @@ return transport; } + public ExceptionListener getExceptionListener() { + return exceptionListener; + } + + public void setExceptionListener(ExceptionListener exceptionListener) { + this.exceptionListener = exceptionListener; + } + } Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java?rev=754964&r1=754963&r2=754964&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java Mon Mar 16 17:25:23 2009 @@ -16,10 +16,15 @@ */ package org.apache.activemq.broker; +import java.beans.ExceptionListener; +import java.io.IOException; + import org.apache.activemq.Connection; import org.apache.activemq.Service; import org.apache.activemq.broker.openwire.OpenwireProtocolHandler; +import org.apache.activemq.broker.stomp.StompProtocolHandler; import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.transport.stomp.StompWireFormat; import org.apache.activemq.wireformat.WireFormat; public class BrokerConnection extends Connection { @@ -34,6 +39,19 @@ public void setWireFormat(WireFormat wf); } + + public BrokerConnection() { + setExceptionListener(new ExceptionListener(){ + public void exceptionThrown(Exception error) { + error.printStackTrace(); + try { + stop(); + } catch (Exception ignore) { + } + } + }); + } + public Broker getBroker() { return broker; } @@ -53,13 +71,27 @@ protocolHandler.onCommand(command); } else { try { + + // TODO: need to make this more extensible and dynamic. Perhaps + // we should lookup the ProtocolHandler via a FactoryFinder WireFormat wf = (WireFormat) command; if( wf.getClass() == OpenWireFormat.class ) { protocolHandler = new OpenwireProtocolHandler(); - protocolHandler.setConnection(this); - protocolHandler.setWireFormat(wf); - protocolHandler.start(); + } else if( wf.getClass() == StompWireFormat.class ) { + protocolHandler = new StompProtocolHandler(); + } else { + throw new IOException("No protocol handler available for: "+wf.getClass()); } + + protocolHandler.setConnection(this); + protocolHandler.setWireFormat(wf); + protocolHandler.start(); + + setExceptionListener(new ExceptionListener(){ + public void exceptionThrown(Exception error) { + protocolHandler.onException(error); + } + }); } catch (Exception e) { onException(e); } @@ -67,19 +99,6 @@ } @Override - public void onException(Exception error) { - if( protocolHandler!=null ) { - protocolHandler.onException(error); - } else { - error.printStackTrace(); - try { - stop(); - } catch (Exception ignore) { - } - } - } - - @Override public void stop() throws Exception { super.stop(); if( protocolHandler!=null ) { Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=754964&r1=754963&r2=754964&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java Mon Mar 16 17:25:23 2009 @@ -35,4 +35,6 @@ public T asType(Class type); + public boolean isPersistent(); + } Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=754964&r1=754963&r2=754964&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Mon Mar 16 17:25:23 2009 @@ -81,4 +81,8 @@ return null; } + public boolean isPersistent() { + return message.isPersistent(); + } + } Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java?rev=754964&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java Mon Mar 16 17:25:23 2009 @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.stomp; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import javax.jms.JMSException; + +import org.apache.activemq.broker.Destination; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.transport.stomp.ProtocolException; +import org.apache.activemq.transport.stomp.Stomp; +import org.apache.activemq.transport.stomp.StompFrame; + +/** + * Implementations of this interface are used to map back and forth from Stomp + * to ActiveMQ. There are several standard mappings which are semantically the + * same, the inner class, Helper, provides functions to copy those properties + * from one to the other + */ +public interface FrameTranslator { + + ActiveMQMessage convertToOpenwireMessage(StompProtocolHandler converter, StompFrame frame) throws JMSException, ProtocolException; + StompFrame convertFromOpenwireMessage(StompProtocolHandler converter, ActiveMQMessage message) throws IOException, JMSException; + + String convertFromOpenwireDestination(StompProtocolHandler converter, ActiveMQDestination d); + ActiveMQDestination convertToOpenwireDestination(StompProtocolHandler converter, String name) throws ProtocolException; + + String convertFromDestination(StompProtocolHandler converter, Destination d) throws ProtocolException; + Destination convertToDestination(StompProtocolHandler converter, String name) throws ProtocolException; + + /** + * Helper class which holds commonly needed functions used when implementing + * FrameTranslators + */ + static final class Helper { + + private Helper() { + } + + public static void copyStandardHeadersFromMessageToFrame(StompProtocolHandler converter, ActiveMQMessage message, StompFrame command, FrameTranslator ft) throws IOException { + final Map headers = command.getHeaders(); + headers.put(Stomp.Headers.Message.DESTINATION, ft.convertFromOpenwireDestination(converter, message.getDestination())); + headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID()); + + if (message.getJMSCorrelationID() != null) { + headers.put(Stomp.Headers.Message.CORRELATION_ID, message.getJMSCorrelationID()); + } + headers.put(Stomp.Headers.Message.EXPIRATION_TIME, "" + message.getJMSExpiration()); + + if (message.getJMSRedelivered()) { + headers.put(Stomp.Headers.Message.REDELIVERED, "true"); + } + headers.put(Stomp.Headers.Message.PRORITY, "" + message.getJMSPriority()); + + if (message.getJMSReplyTo() != null) { + headers.put(Stomp.Headers.Message.REPLY_TO, ft.convertFromOpenwireDestination(converter, (ActiveMQDestination) message.getJMSReplyTo())); + } + headers.put(Stomp.Headers.Message.TIMESTAMP, "" + message.getJMSTimestamp()); + + if (message.getJMSType() != null) { + headers.put(Stomp.Headers.Message.TYPE, message.getJMSType()); + } + + // now lets add all the message headers + final Map properties = message.getProperties(); + if (properties != null) { + for (Map.Entry prop : properties.entrySet()) { + headers.put(prop.getKey(), "" + prop.getValue()); + } + } + } + + public static void copyStandardHeadersFromFrameToMessage(StompProtocolHandler converter, StompFrame command, ActiveMQMessage msg, FrameTranslator ft) throws ProtocolException, JMSException { + final Map headers = new HashMap(command.getHeaders()); + final String destination = headers.remove(Stomp.Headers.Send.DESTINATION); + msg.setDestination(ft.convertToOpenwireDestination(converter, destination)); + + // the standard JMS headers + msg.setJMSCorrelationID(headers.remove(Stomp.Headers.Send.CORRELATION_ID)); + + Object o = headers.remove(Stomp.Headers.Send.EXPIRATION_TIME); + if (o != null) { + msg.setJMSExpiration(Long.parseLong((String)o)); + } + + o = headers.remove(Stomp.Headers.Send.PRIORITY); + if (o != null) { + msg.setJMSPriority(Integer.parseInt((String)o)); + } + + o = headers.remove(Stomp.Headers.Send.TYPE); + if (o != null) { + msg.setJMSType((String)o); + } + + o = headers.remove(Stomp.Headers.Send.REPLY_TO); + if (o != null) { + msg.setJMSReplyTo(ft.convertToOpenwireDestination(converter, (String)o)); + } + + o = headers.remove(Stomp.Headers.Send.PERSISTENT); + if (o != null) { + msg.setPersistent("true".equals(o)); + } + + // now the general headers + msg.setProperties(headers); + } + } +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java?rev=754964&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java Mon Mar 16 17:25:23 2009 @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.stomp; + +import java.io.IOException; +import java.io.Serializable; +import java.io.StringReader; +import java.io.StringWriter; +import java.util.HashMap; +import java.util.Map; + +import javax.jms.JMSException; + +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.apache.activemq.transport.stomp.ProtocolException; +import org.apache.activemq.transport.stomp.Stomp; +import org.apache.activemq.transport.stomp.StompFrame; + +import com.thoughtworks.xstream.XStream; +import com.thoughtworks.xstream.io.HierarchicalStreamReader; +import com.thoughtworks.xstream.io.HierarchicalStreamWriter; +import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver; +import com.thoughtworks.xstream.io.xml.PrettyPrintWriter; +import com.thoughtworks.xstream.io.xml.XppReader; + +/** + * Frame translator implementation that uses XStream to convert messages to and + * from XML and JSON + * + * @author Dejan Bosanac + */ +public class JmsFrameTranslator extends LegacyFrameTranslator { + + XStream xStream = null; + + public ActiveMQMessage convertToOpenwireMessage(StompProtocolHandler converter, + StompFrame command) throws JMSException, ProtocolException { + Map headers = command.getHeaders(); + ActiveMQMessage msg; + String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION); + if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) { + msg = super.convertToOpenwireMessage(converter, command); + } else { + HierarchicalStreamReader in; + + try { + String text = new String(command.getContent(), "UTF-8"); + switch (Stomp.Transformations.getValue(transformation)) { + case JMS_OBJECT_XML: + in = new XppReader(new StringReader(text)); + msg = createObjectMessage(in); + break; + case JMS_OBJECT_JSON: + in = new JettisonMappedXmlDriver().createReader(new StringReader(text)); + msg = createObjectMessage(in); + break; + case JMS_MAP_XML: + in = new XppReader(new StringReader(text)); + msg = createMapMessage(in); + break; + case JMS_MAP_JSON: + in = new JettisonMappedXmlDriver().createReader(new StringReader(text)); + msg = createMapMessage(in); + break; + default: + throw new Exception("Unkown transformation: " + transformation); + } + } catch (Throwable e) { + command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage()); + msg = super.convertToOpenwireMessage(converter, command); + } + } + FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this); + return msg; + } + + public StompFrame convertFromOpenwireMessage(StompProtocolHandler converter, + ActiveMQMessage message) throws IOException, JMSException { + if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) { + StompFrame command = new StompFrame(); + command.setAction(Stomp.Responses.MESSAGE); + Map headers = new HashMap(25); + command.setHeaders(headers); + + FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( + converter, message, command, this); + ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy(); + command.setContent(marshall(msg.getObject(), + headers.get(Stomp.Headers.TRANSFORMATION)) + .getBytes("UTF-8")); + return command; + + } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) { + StompFrame command = new StompFrame(); + command.setAction(Stomp.Responses.MESSAGE); + Map headers = new HashMap(25); + command.setHeaders(headers); + + FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( + converter, message, command, this); + ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy(); + command.setContent(marshall((Serializable)msg.getContentMap(), + headers.get(Stomp.Headers.TRANSFORMATION)) + .getBytes("UTF-8")); + return command; + } else { + return super.convertFromOpenwireMessage(converter, message); + } + } + + /** + * Marshalls the Object to a string using XML or JSON encoding + */ + protected String marshall(Serializable object, String transformation) + throws JMSException { + StringWriter buffer = new StringWriter(); + HierarchicalStreamWriter out; + if (transformation.toLowerCase().endsWith("json")) { + out = new JettisonMappedXmlDriver().createWriter(buffer); + } else { + out = new PrettyPrintWriter(buffer); + } + getXStream().marshal(object, out); + return buffer.toString(); + } + + protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException { + ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage(); + Object obj = getXStream().unmarshal(in); + objMsg.setObject((Serializable) obj); + return objMsg; + } + + protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException { + ActiveMQMapMessage mapMsg = new ActiveMQMapMessage(); + Map map = (Map)getXStream().unmarshal(in); + for (String key : map.keySet()) { + mapMsg.setObject(key, map.get(key)); + } + return mapMsg; + } + + + + // Properties + // ------------------------------------------------------------------------- + public XStream getXStream() { + if (xStream == null) { + xStream = new XStream(); + } + return xStream; + } + + public void setXStream(XStream xStream) { + this.xStream = xStream; + } + + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java?rev=754964&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java Mon Mar 16 17:25:23 2009 @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.stomp; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import javax.jms.JMSException; + +import org.apache.activemq.broker.Destination; +import org.apache.activemq.broker.Router; +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.protobuf.AsciiBuffer; +import org.apache.activemq.transport.stomp.ProtocolException; +import org.apache.activemq.transport.stomp.Stomp; +import org.apache.activemq.transport.stomp.StompFrame; + +/** + * Implements ActiveMQ 4.0 translations + */ +public class LegacyFrameTranslator implements FrameTranslator { + + + public ActiveMQMessage convertToOpenwireMessage(StompProtocolHandler converter, StompFrame command) throws JMSException, ProtocolException { + final Map headers = command.getHeaders(); + final ActiveMQMessage msg; + if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) { + headers.remove(Stomp.Headers.CONTENT_LENGTH); + ActiveMQBytesMessage bm = new ActiveMQBytesMessage(); + bm.writeBytes(command.getContent()); + msg = bm; + } else { + ActiveMQTextMessage text = new ActiveMQTextMessage(); + try { + text.setText(new String(command.getContent(), "UTF-8")); + } catch (Throwable e) { + throw new ProtocolException("Text could not bet set: " + e, false, e); + } + msg = text; + } + FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this); + return msg; + } + + public StompFrame convertFromOpenwireMessage(StompProtocolHandler converter, ActiveMQMessage message) throws IOException, JMSException { + StompFrame command = new StompFrame(); + command.setAction(Stomp.Responses.MESSAGE); + Map headers = new HashMap(25); + command.setHeaders(headers); + + FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this); + + if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { + + ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy(); + command.setContent(msg.getText().getBytes("UTF-8")); + + } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) { + + ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy(); + msg.setReadOnlyBody(true); + byte[] data = new byte[(int)msg.getBodyLength()]; + msg.readBytes(data); + + headers.put(Stomp.Headers.CONTENT_LENGTH, "" + data.length); + command.setContent(data); + } + return command; + } + + public String convertFromOpenwireDestination(StompProtocolHandler converter, ActiveMQDestination activeMQDestination) { + if (activeMQDestination == null) { + return null; + } + String physicalName = activeMQDestination.getPhysicalName(); + + String rc = converter.getCreatedTempDestinationName(activeMQDestination); + if( rc!=null ) { + return rc; + } + + StringBuffer buffer = new StringBuffer(); + if (activeMQDestination.isQueue()) { + if (activeMQDestination.isTemporary()) { + buffer.append("/remote-temp-queue/"); + } else { + buffer.append("/queue/"); + } + } else { + if (activeMQDestination.isTemporary()) { + buffer.append("/remote-temp-topic/"); + } else { + buffer.append("/topic/"); + } + } + buffer.append(physicalName); + return buffer.toString(); + } + + public ActiveMQDestination convertToOpenwireDestination(StompProtocolHandler converter, String name) throws ProtocolException { + if (name == null) { + return null; + } else if (name.startsWith("/queue/")) { + String qName = name.substring("/queue/".length(), name.length()); + return ActiveMQDestination.createDestination(qName, ActiveMQDestination.QUEUE_TYPE); + } else if (name.startsWith("/topic/")) { + String tName = name.substring("/topic/".length(), name.length()); + return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TOPIC_TYPE); + } else if (name.startsWith("/remote-temp-queue/")) { + String tName = name.substring("/remote-temp-queue/".length(), name.length()); + return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_QUEUE_TYPE); + } else if (name.startsWith("/remote-temp-topic/")) { + String tName = name.substring("/remote-temp-topic/".length(), name.length()); + return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_TOPIC_TYPE); + } else if (name.startsWith("/temp-queue/")) { + return converter.createTempQueue(name); + } else if (name.startsWith("/temp-topic/")) { + return converter.createTempTopic(name); + } else { + throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations " + + "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/"); + } + } + + public String convertFromDestination(StompProtocolHandler converter, Destination d) throws ProtocolException { + if (d == null) { + return null; + } + + StringBuffer buffer = new StringBuffer(); + if( d.getDomain().equals(Router.QUEUE_DOMAIN) ) { + buffer.append("/queue/"); + } else if( d.getDomain().equals(Router.QUEUE_DOMAIN) ) { + buffer.append("/topic/"); + } else { + throw new ProtocolException("Illegal destination: Stomp can only handle queue or topic Domains"); + } + + buffer.append(d.getName().toString()); + return buffer.toString(); + } + + public Destination convertToDestination(StompProtocolHandler converter, String name) throws ProtocolException { + if (name == null) { + return null; + } else if (name.startsWith("/queue/")) { + String qName = name.substring("/queue/".length(), name.length()); + return new Destination.SingleDestination(Router.QUEUE_DOMAIN, new AsciiBuffer(qName)); + } else if (name.startsWith("/topic/")) { + String tName = name.substring("/topic/".length(), name.length()); + return new Destination.SingleDestination(Router.TOPIC_DOMAIN, new AsciiBuffer(tName)); + } else if (name.startsWith("/remote-temp-queue/")) { + throw new UnsupportedOperationException(); + } else if (name.startsWith("/remote-temp-topic/")) { + throw new UnsupportedOperationException(); + } else if (name.startsWith("/temp-queue/")) { + throw new UnsupportedOperationException(); + } else if (name.startsWith("/temp-topic/")) { + throw new UnsupportedOperationException(); + } else { + throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations " + + "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/"); + } + } +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=754964&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java Mon Mar 16 17:25:23 2009 @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.stomp; + +import org.apache.activemq.broker.Destination; +import org.apache.activemq.broker.MessageDelivery; +import org.apache.activemq.protobuf.AsciiBuffer; +import org.apache.activemq.transport.stomp.Stomp; +import org.apache.activemq.transport.stomp.StompFrame; + +public class StompMessageDelivery implements MessageDelivery { + + private final StompFrame frame; + private Destination destination; + private Runnable completionCallback; + private String receiptId; + private int priority = Integer.MIN_VALUE; + private AsciiBuffer msgId; + + public StompMessageDelivery(StompFrame frame, Destination destiantion) { + this.frame = frame; + this.destination = destiantion; + this.frame.setAction(Stomp.Responses.MESSAGE); + this.receiptId = frame.getHeaders().remove(Stomp.Headers.RECEIPT_REQUESTED); + } + + public Destination getDestination() { + return destination; + } + + public int getFlowLimiterSize() { + return frame.getContent().length; + } + + public int getPriority() { + if( priority == Integer.MIN_VALUE ) { + String p = frame.getHeaders().get(Stomp.Headers.Message.PRORITY); + try { + priority = (p == null) ? 4 : Integer.parseInt(p); + } catch (NumberFormatException e) { + priority = 4; + } + } + return priority; + } + + public AsciiBuffer getMsgId() { + if( msgId == null ) { + String p = frame.getHeaders().get(Stomp.Headers.Message.MESSAGE_ID); + if( p!=null ) { + msgId = new AsciiBuffer(p); + } + } + return msgId; + } + + public AsciiBuffer getProducerId() { + return null; + } + + public Runnable getCompletionCallback() { + return completionCallback; + } + + public void setCompletionCallback(Runnable completionCallback) { + this.completionCallback = completionCallback; + } + + public T asType(Class type) { + if( type == StompFrame.class ) { + return type.cast(frame); + } + return null; + } + + public StompFrame getStomeFame() { + return frame; + } + + public String getReceiptId() { + return receiptId; + } + + public boolean isPersistent() { + String p = frame.getHeaders().get(Stomp.Headers.Send.PERSISTENT); + return "true".equals(p); + } + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=754964&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Mon Mar 16 17:25:23 2009 @@ -0,0 +1,496 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.stomp; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; + +import org.apache.activemq.WindowLimiter; +import org.apache.activemq.broker.BrokerConnection; +import org.apache.activemq.broker.DeliveryTarget; +import org.apache.activemq.broker.Destination; +import org.apache.activemq.broker.MessageDelivery; +import org.apache.activemq.broker.Router; +import org.apache.activemq.broker.BrokerConnection.ProtocolHandler; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; +import org.apache.activemq.filter.BooleanExpression; +import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.flow.Flow; +import org.apache.activemq.flow.FlowController; +import org.apache.activemq.flow.IFlowController; +import org.apache.activemq.flow.IFlowDrain; +import org.apache.activemq.flow.IFlowSink; +import org.apache.activemq.flow.IFlowSource; +import org.apache.activemq.flow.ISourceController; +import org.apache.activemq.flow.SizeLimiter; +import org.apache.activemq.flow.ISinkController.FlowControllable; +import org.apache.activemq.protobuf.AsciiBuffer; +import org.apache.activemq.queue.SingleFlowRelay; +import org.apache.activemq.selector.SelectorParser; +import org.apache.activemq.transport.stomp.Stomp; +import org.apache.activemq.transport.stomp.StompFrame; +import org.apache.activemq.transport.stomp.StompSubscription; +import org.apache.activemq.util.ByteArrayOutputStream; +import org.apache.activemq.util.FactoryFinder; +import org.apache.activemq.wireformat.WireFormat; + + +public class StompProtocolHandler implements ProtocolHandler { + + interface ActionHander { + public void onStompFrame(StompFrame frame) throws Exception; + } + + protected final HashMap actionHandlers = new HashMap(); + protected final HashMap consumers = new HashMap(); + + protected final Object inboundMutex = new Object(); + protected IFlowController inboundController; + + protected BrokerConnection connection; + + // TODO: need to update the FrameTranslator to normalize to new broker API objects instead of to the openwire command set. + private final FrameTranslator translator = new LegacyFrameTranslator(); + private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/broker/stomp/frametranslator/"); + private SingleFlowRelay outboundQueue; + + private HashMap allSentMessageIds = new HashMap(); + + protected FrameTranslator translator(StompFrame frame) { + try { + String header = frame.getHeaders().get(Stomp.Headers.TRANSFORMATION); + if (header != null) { + return (FrameTranslator) FRAME_TRANSLATOR_FINDER.newInstance(header); + } + } catch (Exception ignore) { + } + return translator; + } + + public StompProtocolHandler() { + actionHandlers.put(Stomp.Commands.CONNECT, new ActionHander(){ + public void onStompFrame(StompFrame frame) throws Exception { + StompFrame response = new StompFrame(Stomp.Responses.CONNECTED); + connection.write(response); + } + }); + actionHandlers.put(Stomp.Commands.SEND, new ActionHander(){ + public void onStompFrame(StompFrame frame) throws Exception { + String dest = frame.getHeaders().get(Stomp.Headers.Send.DESTINATION); + Destination destination = translator(frame).convertToDestination(StompProtocolHandler.this, dest); + + StompMessageDelivery md = new StompMessageDelivery(frame, destination); + while (!inboundController.offer(md, null)) { + inboundController.waitForFlowUnblock(); + } + } + }); + actionHandlers.put(Stomp.Commands.SUBSCRIBE, new ActionHander(){ + public void onStompFrame(StompFrame frame) throws Exception { + ConsumerContext ctx = new ConsumerContext(frame); + consumers.put(ctx.stompDestination, ctx); + connection.getBroker().getRouter().bind(ctx.destination, ctx); + ack(frame); + } + }); + actionHandlers.put(Stomp.Commands.UNSUBSCRIBE, new ActionHander(){ + public void onStompFrame(StompFrame frame) throws Exception { + } + }); + actionHandlers.put(Stomp.Commands.ACK, new ActionHander(){ + public void onStompFrame(StompFrame frame) throws Exception { + frame.getHeaders().get(Stomp.Headers.Ack.MESSAGE_ID); + } + }); + actionHandlers.put(Stomp.Commands.DISCONNECT, new ActionHander(){ + public void onStompFrame(StompFrame frame) throws Exception { + } + }); + + actionHandlers.put(Stomp.Commands.ABORT_TRANSACTION, new ActionHander(){ + public void onStompFrame(StompFrame frame) throws Exception { + } + }); + actionHandlers.put(Stomp.Commands.BEGIN_TRANSACTION, new ActionHander(){ + public void onStompFrame(StompFrame frame) throws Exception { + } + }); + actionHandlers.put(Stomp.Commands.COMMIT_TRANSACTION, new ActionHander(){ + public void onStompFrame(StompFrame frame) throws Exception { + } + }); + } + + public void start() throws Exception { + // Setup the inbound processing.. + final Flow inboundFlow = new Flow("broker-"+connection.getName()+"-inbound", false); + SizeLimiter limiter = new SizeLimiter(connection.getInputWindowSize(), connection.getInputResumeThreshold()); + inboundController = new FlowController(new FlowControllableAdapter() { + public void flowElemAccepted(ISourceController controller, MessageDelivery elem) { + route(controller, elem); + } + + public String toString() { + return inboundFlow.getFlowName(); + } + }, inboundFlow, limiter, inboundMutex); + + Flow outboundFlow = new Flow("broker-"+connection.getName()+"-outbound", false); + limiter = new SizeLimiter(connection.getOutputWindowSize(), connection.getOutputWindowSize()); + outboundQueue = new SingleFlowRelay(outboundFlow, outboundFlow.getFlowName(), limiter); + outboundQueue.setDrain(new IFlowDrain() { + public void drain(final MessageDelivery message, ISourceController controller) { + StompFrame msg = message.asType(StompFrame.class); + connection.write(msg); + }; + }); + + } + + public void stop() throws Exception { + } + + public void onCommand(Object o) { + StompFrame command = (StompFrame)o; + try { + String action = command.getAction(); + ActionHander actionHander = actionHandlers.get(action); + if( actionHander == null ) { + throw new IOException("Unsupported command: "+action); + } + actionHander.onStompFrame(command); + } catch (Exception error) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8")); + error.printStackTrace(stream); + stream.close(); + + HashMap headers = new HashMap(); + headers.put(Stomp.Headers.Error.MESSAGE, error.getMessage()); + + if (command != null) { + final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); + if (receiptId != null) { + headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId); + } + } + + StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray()); + connection.write(errorMessage); + connection.stop(); + } catch (Exception ignore) { + } + } + } + + public void onException(Exception error) { + if( !connection.isStopping() ) { + try { + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8")); + error.printStackTrace(stream); + stream.close(); + + sendError(error.getMessage(), baos.toByteArray()); + connection.stop(); + + } catch (Exception ignore) { + } + } + } + + // ///////////////////////////////////////////////////////////////// + // Internal Support Methods + // ///////////////////////////////////////////////////////////////// + static class FlowControllableAdapter implements FlowControllable { + public void flowElemAccepted(ISourceController controller, MessageDelivery elem) { + } + + public IFlowSink getFlowSink() { + return null; + } + + public IFlowSource getFlowSource() { + return null; + } + } + + class ConsumerContext implements DeliveryTarget { + + private BooleanExpression selector; + + private SingleFlowRelay queue; + public WindowLimiter limiter; + private FrameTranslator translator; + private String subscriptionId; + private String stompDestination; + private Destination destination; + private String ackMode; + + private LinkedHashMap sentMessageIds = new LinkedHashMap(); + + public ConsumerContext(final StompFrame subscribe) throws Exception { + translator = translator(subscribe); + + Map headers = subscribe.getHeaders(); + stompDestination = headers.get(Stomp.Headers.Subscribe.DESTINATION); + destination = translator.convertToDestination(StompProtocolHandler.this, stompDestination); + subscriptionId = headers.get(Stomp.Headers.Subscribe.ID); + + ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE); + if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) { + ackMode = StompSubscription.CLIENT_ACK; + } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) { + ackMode = StompSubscription.INDIVIDUAL_ACK; + sendError(StompSubscription.INDIVIDUAL_ACK+" not supported."); + connection.stop(); + return; + } else { + ackMode = StompSubscription.AUTO_ACK; + } + + selector = parseSelector(subscribe); + + if( ackMode != StompSubscription.AUTO_ACK ) { + Flow flow = new Flow("broker-"+subscriptionId+"-outbound", false); + limiter = new WindowLimiter(true, flow, 1000, 500) { + public int getElementSize(MessageDelivery m) { + return 1; + } + }; + queue = new SingleFlowRelay(flow, flow.getFlowName(), limiter); + queue.setDrain(new IFlowDrain() { + public void drain(final MessageDelivery message, ISourceController controller) { + StompFrame frame = message.asType(StompFrame.class); + if (ackMode == StompSubscription.CLIENT_ACK || ackMode==StompSubscription.INDIVIDUAL_ACK) { + synchronized(allSentMessageIds) { + AsciiBuffer msgId = message.getMsgId(); + sentMessageIds.put(msgId, msgId); + allSentMessageIds.put(msgId, ConsumerContext.this); + } + } + connection.write(frame); + }; + }); + } else { + queue = outboundQueue; + } + + } + + public void ack(StompFrame info) throws Exception { + if (ackMode == StompSubscription.CLIENT_ACK || ackMode==StompSubscription.INDIVIDUAL_ACK) { + int credits = 0; + synchronized(allSentMessageIds) { + AsciiBuffer mid = new AsciiBuffer(info.getHeaders().get(Stomp.Headers.Ack.MESSAGE_ID)); + for (Iterator iterator = sentMessageIds.keySet().iterator(); iterator.hasNext();) { + AsciiBuffer next = iterator.next(); + iterator.remove(); + allSentMessageIds.remove(next); + credits++; + if( next.equals(mid) ) { + break; + } + } + + } + synchronized(queue) { + limiter.onProtocolCredit(credits); + } + + } else { + // We should not be getting an ACK. + sendError("ACK not expected."); + connection.stop(); + } + + } + + public IFlowSink getSink() { + return queue; + } + + public boolean match(MessageDelivery message) { + StompFrame stompMessage = message.asType(StompFrame.class); + if (stompMessage == null) { + return false; + } + + Message msg = message.asType(Message.class); + if (msg == null) { + return false; + } + + // TODO: abstract the Selector bits so that it is not openwire specific. + MessageEvaluationContext selectorContext = new MessageEvaluationContext(); + selectorContext.setMessageReference(msg); + selectorContext.setDestination(msg.getDestination()); + try { + return (selector == null || selector.matches(selectorContext)); + } catch (JMSException e) { + e.printStackTrace(); + return false; + } + } + + } + + private void sendError(String message) { + sendError(message, StompFrame.NO_DATA); + } + + private void sendError(String message, String details) { + try { + sendError(message, details.getBytes("UTF-8")); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + private void sendError(String message, byte[] details) { + HashMap headers = new HashMap(); + headers.put(Stomp.Headers.Error.MESSAGE, message); + StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, details); + connection.write(errorMessage); + } + + private void ack(StompFrame frame) { + ack(frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED)); + } + private void ack(String receiptId) { + if (receiptId != null) { + StompFrame receipt = new StompFrame(); + receipt.setAction(Stomp.Responses.RECEIPT); + receipt.setHeaders(new HashMap(1)); + receipt.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); + connection.write(receipt); + } + } + + protected void route(ISourceController controller, MessageDelivery messageDelivery) { + // TODO: + // Consider doing some caching of this target list. Most producers + // always send to + // the same destination. + Collection targets = connection.getBroker().getRouter().route(messageDelivery); + final StompMessageDelivery smd = ((StompMessageDelivery) messageDelivery); + String receiptId = smd.getReceiptId(); + if (targets != null) { + if (receiptId!=null) { + // We need to ack the message once we ensure we won't loose it. + // We know we won't loose it once it's persisted or delivered to + // a consumer + // Setup a callback to get notifed once one of those happens. + if (messageDelivery.isPersistent()) { + messageDelivery.setCompletionCallback(new Runnable() { + public void run() { + ack(smd.getStomeFame()); + } + }); + } else { + // Let the client know the broker got the message. + ack(smd.getStomeFame()); + } + } + + // Deliver the message to all the targets.. + for (DeliveryTarget dt : targets) { + if (dt.match(messageDelivery)) { + dt.getSink().add(messageDelivery, controller); + } + } + + } else { + // Let the client know we got the message even though there + // were no valid targets to deliver the message to. + if (receiptId!=null) { + ack(receiptId); + } + } + controller.elementDispatched(messageDelivery); + } + + static public Destination convert(ActiveMQDestination dest) { + if (dest.isComposite()) { + ActiveMQDestination[] compositeDestinations = dest.getCompositeDestinations(); + ArrayList d = new ArrayList(); + for (int i = 0; i < compositeDestinations.length; i++) { + d.add(convert(compositeDestinations[i])); + } + return new Destination.MultiDestination(d); + } + AsciiBuffer domain; + if (dest.isQueue()) { + domain = Router.QUEUE_DOMAIN; + } + if (dest.isTopic()) { + domain = Router.TOPIC_DOMAIN; + } else { + throw new IllegalArgumentException("Unsupported domain type: " + dest); + } + return new Destination.SingleDestination(domain, new AsciiBuffer(dest.getPhysicalName())); + } + + private static BooleanExpression parseSelector(StompFrame frame) throws InvalidSelectorException { + BooleanExpression rc = null; + String selector = frame.getHeaders().get(Stomp.Headers.Subscribe.SELECTOR); + if( selector !=null ) { + rc = SelectorParser.parse(selector); + } + return rc; + } + + public BrokerConnection getConnection() { + return connection; + } + + public void setConnection(BrokerConnection connection) { + this.connection = connection; + } + + public void setWireFormat(WireFormat wireFormat) { + } + + public String getCreatedTempDestinationName(ActiveMQDestination activeMQDestination) { + // TODO Auto-generated method stub + return null; + } + + public ActiveMQDestination createTempQueue(String name) { + // TODO Auto-generated method stub + return null; + } + + public ActiveMQDestination createTempTopic(String name) { + // TODO Auto-generated method stub + return null; + } + +} Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java?rev=754964&r1=754963&r2=754964&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java Mon Mar 16 17:25:23 2009 @@ -16,17 +16,48 @@ */ package org.apache.activemq.wireformat; +import java.io.UnsupportedEncodingException; + +import org.apache.activemq.transport.stomp.Stomp; import org.apache.activemq.transport.stomp.StompWireFormatFactory; import org.apache.activemq.util.ByteSequence; public class DiscriminatableStompWireFormatFactory extends StompWireFormatFactory implements DiscriminatableWireFormatFactory { + static byte MAGIC[] = toBytes(Stomp.Commands.CONNECT); + + static private byte[] toBytes(String value) { + try { + return value.getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + public boolean matchesWireformatHeader(ByteSequence byteSequence) { + byte[] data = byteSequence.data; + if( byteSequence.length >= MAGIC.length) { + int offset = byteSequence.length - MAGIC.length; + for(int i=0; i < byteSequence.length; i++) { + // Newlines are allowed to precede the STOMP connect command. + if( i < offset ) { + if( data[i]!='\n' && data[i]!='\r' ) { + return false; + } + } else { + if( data[i]!=MAGIC[i-offset] ) { + return false; + } + } + return true; + } + } return false; } + public int maxWireformatHeaderLength() { - return 100; + return MAGIC.length+10; } } Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=754964&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (added) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Mon Mar 16 17:25:23 2009 @@ -0,0 +1,512 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.beans.ExceptionListener; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import junit.framework.TestCase; + +import org.apache.activemq.dispatch.IDispatcher; +import org.apache.activemq.dispatch.PriorityDispatcher; +import org.apache.activemq.metric.MetricAggregator; +import org.apache.activemq.metric.Period; +import org.apache.activemq.protobuf.AsciiBuffer; +import org.apache.activemq.queue.Mapper; + +public abstract class BrokerTestBase extends TestCase { + + protected static final int PERFORMANCE_SAMPLES = 3; + + protected static final int IO_WORK_AMOUNT = 0; + protected static final int FANIN_COUNT = 10; + protected static final int FANOUT_COUNT = 10; + + protected static final int PRIORITY_LEVELS = 10; + protected static final boolean USE_INPUT_QUEUES = true; + + // Set to put senders and consumers on separate brokers. + protected boolean multibroker = false; + + // Set to mockup up ptp: + protected boolean ptp = false; + + // Set to use tcp IO + protected boolean tcp = true; + // set to force marshalling even in the NON tcp case. + protected boolean forceMarshalling = false; + + protected String sendBrokerBindURI; + protected String receiveBrokerBindURI; + protected String sendBrokerConnectURI; + protected String receiveBrokerConnectURI; + + // Set's the number of threads to use: + protected final int asyncThreadPoolSize = Runtime.getRuntime().availableProcessors(); + protected boolean usePartitionedQueue = false; + + protected int producerCount; + protected int consumerCount; + protected int destCount; + + protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items"); + protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items"); + + protected Broker sendBroker; + protected Broker rcvBroker; + protected ArrayList brokers = new ArrayList(); + protected IDispatcher dispatcher; + protected final AtomicLong msgIdGenerator = new AtomicLong(); + protected final AtomicBoolean stopping = new AtomicBoolean(); + + final ArrayList producers = new ArrayList(); + final ArrayList consumers = new ArrayList(); + + static public final Mapper KEY_MAPPER = new Mapper() { + public AsciiBuffer map(MessageDelivery element) { + return element.getMsgId(); + } + }; + static public final Mapper PARTITION_MAPPER = new Mapper() { + public Integer map(MessageDelivery element) { + // we modulo 10 to have at most 10 partitions which the producers + // gets split across. + return (int) (element.getProducerId().hashCode() % 10); + } + }; + + @Override + protected void setUp() throws Exception { + dispatcher = createDispatcher(); + dispatcher.start(); + if (tcp) { + sendBrokerBindURI = "tcp://localhost:10000?wireFormat=multi"; + receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=multi"; + sendBrokerConnectURI = "tcp://localhost:10000"; + receiveBrokerConnectURI = "tcp://localhost:20000"; + } else { + if (forceMarshalling) { + sendBrokerBindURI = "pipe://SendBroker"; + receiveBrokerBindURI = "pipe://ReceiveBroker"; + } else { + sendBrokerBindURI = "pipe://SendBroker"; + receiveBrokerBindURI = "pipe://ReceiveBroker"; + } + sendBrokerConnectURI = sendBrokerBindURI; + receiveBrokerConnectURI = receiveBrokerBindURI; + } + } + + protected IDispatcher createDispatcher() { + return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Broker.MAX_PRIORITY, asyncThreadPoolSize); + } + + public void test_1_1_0() throws Exception { + producerCount = 1; + destCount = 1; + + createConnections(); + + // Start 'em up. + startServices(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + public void test_1_1_1() throws Exception { + producerCount = 1; + destCount = 1; + consumerCount = 1; + + createConnections(); + + // Start 'em up. + startServices(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + public void test_10_1_10() throws Exception { + producerCount = FANIN_COUNT; + consumerCount = FANOUT_COUNT; + destCount = 1; + + createConnections(); + + // Start 'em up. + startServices(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + public void test_10_1_1() throws Exception { + producerCount = FANIN_COUNT; + destCount = 1; + consumerCount = 1; + + createConnections(); + + // Start 'em up. + startServices(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + public void test_1_1_10() throws Exception { + producerCount = 1; + destCount = 1; + consumerCount = FANOUT_COUNT; + + createConnections(); + + // Start 'em up. + startServices(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + public void test_2_2_2() throws Exception { + producerCount = 2; + destCount = 2; + consumerCount = 2; + + createConnections(); + + // Start 'em up. + startServices(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + public void test_10_10_10() throws Exception { + producerCount = 10; + destCount = 10; + consumerCount = 10; + + createConnections(); + + // Start 'em up. + startServices(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + + /** + * Tests 2 producers sending to 1 destination with 2 consumres, but with + * consumers set to select only messages from each producer. 1 consumers is + * set to slow, the other producer should be able to send quickly. + * + * @throws Exception + */ + public void test_2_2_2_SlowConsumer() throws Exception { + producerCount = 2; + destCount = 2; + consumerCount = 2; + + createConnections(); + consumers.get(0).setThinkTime(50); + + // Start 'em up. + startServices(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + public void test_2_2_2_Selector() throws Exception { + producerCount = 2; + destCount = 2; + consumerCount = 2; + + createConnections(); + + // Add properties to match producers to their consumers + for (int i = 0; i < consumerCount; i++) { + String property = "match" + i; + consumers.get(i).setSelector(property); + producers.get(i).setProperty(property); + } + + // Start 'em up. + startServices(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + /** + * Test sending with 1 high priority sender. The high priority sender should + * have higher throughput than the other low priority senders. + * + * @throws Exception + */ + public void test_2_1_1_HighPriorityProducer() throws Exception { + + producerCount = 2; + destCount = 1; + consumerCount = 1; + + createConnections(); + RemoteProducer producer = producers.get(0); + producer.setPriority(1); + producer.getRate().setName("High Priority Producer Rate"); + + consumers.get(0).setThinkTime(1); + + // Start 'em up. + startServices(); + try { + + System.out.println("Checking rates for test: " + getName()); + for (int i = 0; i < PERFORMANCE_SAMPLES; i++) { + Period p = new Period(); + Thread.sleep(1000 * 5); + System.out.println(producer.getRate().getRateSummary(p)); + System.out.println(totalProducerRate.getRateSummary(p)); + System.out.println(totalConsumerRate.getRateSummary(p)); + totalProducerRate.reset(); + totalConsumerRate.reset(); + } + + } finally { + stopServices(); + } + } + + /** + * Test sending with 1 high priority sender. The high priority sender should + * have higher throughput than the other low priority senders. + * + * @throws Exception + */ + public void test_2_1_1_MixedHighPriorityProducer() throws Exception { + producerCount = 2; + destCount = 1; + consumerCount = 1; + + createConnections(); + RemoteProducer producer = producers.get(0); + producer.setPriority(1); + producer.setPriorityMod(3); + producer.getRate().setName("High Priority Producer Rate"); + + consumers.get(0).setThinkTime(1); + + // Start 'em up. + startServices(); + try { + + System.out.println("Checking rates for test: " + getName()); + for (int i = 0; i < PERFORMANCE_SAMPLES; i++) { + Period p = new Period(); + Thread.sleep(1000 * 5); + System.out.println(producer.getRate().getRateSummary(p)); + System.out.println(totalProducerRate.getRateSummary(p)); + System.out.println(totalConsumerRate.getRateSummary(p)); + totalProducerRate.reset(); + totalConsumerRate.reset(); + } + + } finally { + stopServices(); + } + } + + private void reportRates() throws InterruptedException { + System.out.println("Checking rates for test: " + getName() + ", " + (ptp ? "ptp" : "topic")); + for (int i = 0; i < PERFORMANCE_SAMPLES; i++) { + Period p = new Period(); + Thread.sleep(1000 * 5); + System.out.println(totalProducerRate.getRateSummary(p)); + System.out.println(totalConsumerRate.getRateSummary(p)); + totalProducerRate.reset(); + totalConsumerRate.reset(); + } + } + + private void createConnections() throws IOException, URISyntaxException { + + if (multibroker) { + sendBroker = createBroker("SendBroker", sendBrokerBindURI, sendBrokerConnectURI); + rcvBroker = createBroker("RcvBroker", receiveBrokerBindURI, receiveBrokerConnectURI); + brokers.add(sendBroker); + brokers.add(rcvBroker); + } else { + sendBroker = rcvBroker = createBroker("Broker", sendBrokerBindURI, sendBrokerConnectURI); + brokers.add(sendBroker); + } + + Destination[] dests = new Destination[destCount]; + + for (int i = 0; i < destCount; i++) { + Destination.SingleDestination bean = new Destination.SingleDestination(); + bean.setName(new AsciiBuffer("dest" + (i + 1))); + bean.setDomain(ptp ? Router.QUEUE_DOMAIN : Router.TOPIC_DOMAIN); + dests[i] = bean; + if (ptp) { + Queue queue = createQueue(sendBroker, dests[i]); + sendBroker.addQueue(queue); + if (multibroker) { + queue = createQueue(rcvBroker, dests[i]); + rcvBroker.addQueue(queue); + } + } + } + + for (int i = 0; i < producerCount; i++) { + Destination destination = dests[i % destCount]; + RemoteProducer producer = createProducer(i, destination); + producers.add(producer); + } + + for (int i = 0; i < consumerCount; i++) { + Destination destination = dests[i % destCount]; + RemoteConsumer consumer = createConsumer(i, destination); + consumers.add(consumer); + } + + // Create MultiBroker connections: + // if (multibroker) { + // Pipe pipe = new Pipe(); + // sendBroker.createBrokerConnection(rcvBroker, pipe); + // rcvBroker.createBrokerConnection(sendBroker, pipe.connect()); + // } + } + + private RemoteConsumer createConsumer(int i, Destination destination) throws URISyntaxException { + RemoteConsumer consumer = createConsumer(); + consumer.setExceptionListener(new ExceptionListener(){ + public void exceptionThrown(Exception error) { + if( !stopping.get() ) { + System.err.println("Consumer Async Error:"); + error.printStackTrace(); + } + } + }); + consumer.setUri(new URI(rcvBroker.getConnectUri())); + consumer.setDestination(destination); + consumer.setName("consumer" + (i + 1)); + consumer.setTotalConsumerRate(totalConsumerRate); + consumer.setDispatcher(dispatcher); + return consumer; + } + + abstract protected RemoteConsumer createConsumer(); + + private RemoteProducer createProducer(int id, Destination destination) throws URISyntaxException { + RemoteProducer producer = cerateProducer(); + producer.setExceptionListener(new ExceptionListener(){ + public void exceptionThrown(Exception error) { + if( !stopping.get() ) { + System.err.println("Producer Async Error:"); + error.printStackTrace(); + } + } + }); + producer.setUri(new URI(sendBroker.getConnectUri())); + producer.setProducerId(id + 1); + producer.setName("producer" + (id + 1)); + producer.setDestination(destination); + producer.setMessageIdGenerator(msgIdGenerator); + producer.setTotalProducerRate(totalProducerRate); + producer.setDispatcher(dispatcher); + return producer; + } + + abstract protected RemoteProducer cerateProducer(); + + private Queue createQueue(Broker broker, Destination destination) { + Queue queue = new Queue(); + queue.setBroker(broker); + queue.setDestination(destination); + queue.setKeyExtractor(KEY_MAPPER); + if (usePartitionedQueue) { + queue.setPartitionMapper(PARTITION_MAPPER); + } + return queue; + } + + private Broker createBroker(String name, String bindURI, String connectUri) { + Broker broker = new Broker(); + broker.setName(name); + broker.setBindUri(bindURI); + broker.setConnectUri(connectUri); + broker.setDispatcher(dispatcher); + return broker; + } + + private void stopServices() throws Exception { + stopping.set(true); + for (Broker broker : brokers) { + broker.stop(); + } + for (RemoteProducer connection : producers) { + connection.stop(); + } + for (RemoteConsumer connection : consumers) { + connection.stop(); + } + if (dispatcher != null) { + dispatcher.shutdown(); + } + } + + private void startServices() throws Exception { + for (Broker broker : brokers) { + broker.start(); + } + for (RemoteConsumer connection : consumers) { + connection.start(); + } + + for (RemoteProducer connection : producers) { + connection.start(); + } + } + +} Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java?rev=754964&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java (added) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java Mon Mar 16 17:25:23 2009 @@ -0,0 +1,118 @@ +package org.apache.activemq.broker; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.Connection; +import org.apache.activemq.flow.ISourceController; +import org.apache.activemq.metric.MetricAggregator; +import org.apache.activemq.metric.MetricCounter; +import org.apache.activemq.transport.DispatchableTransport; +import org.apache.activemq.transport.TransportFactory; + +abstract public class RemoteConsumer extends Connection { + + protected final MetricCounter consumerRate = new MetricCounter(); + + protected MetricAggregator totalConsumerRate; + protected long thinkTime; + protected Destination destination; + protected String selector; + protected URI uri; + + private boolean schedualWait; + + public void start() throws Exception { + consumerRate.name("Consumer " + name + " Rate"); + totalConsumerRate.add(consumerRate); + + transport = TransportFactory.compositeConnect(uri); + if(transport instanceof DispatchableTransport) + { + schedualWait = true; + } + initialize(); + super.start(); + setupSubscription(); + + } + + + abstract protected void setupSubscription() throws Exception; + + protected void messageReceived(final ISourceController controller, final MessageDelivery elem) { + if( schedualWait ) { + if (thinkTime > 0) { + getDispatcher().schedule(new Runnable(){ + public void run() { + consumerRate.increment(); + controller.elementDispatched(elem); + } + }, thinkTime, TimeUnit.MILLISECONDS); + + } + else + { + consumerRate.increment(); + controller.elementDispatched(elem); + } + + } else { + if( thinkTime>0 ) { + try { + Thread.sleep(thinkTime); + } catch (InterruptedException e) { + } + } + consumerRate.increment(); + controller.elementDispatched(elem); + } + } + + public void setName(String name) { + this.name = name; + } + + public MetricAggregator getTotalConsumerRate() { + return totalConsumerRate; + } + + public void setTotalConsumerRate(MetricAggregator totalConsumerRate) { + this.totalConsumerRate = totalConsumerRate; + } + + public Destination getDestination() { + return destination; + } + + public void setDestination(Destination destination) { + this.destination = destination; + } + + public long getThinkTime() { + return thinkTime; + } + + public void setThinkTime(long thinkTime) { + this.thinkTime = thinkTime; + } + + public MetricCounter getConsumerRate() { + return consumerRate; + } + + public String getSelector() { + return selector; + } + + public void setSelector(String selector) { + this.selector = selector; + } + + public URI getUri() { + return uri; + } + + public void setUri(URI uri) { + this.uri = uri; + }} Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java?rev=754964&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java (added) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java Mon Mar 16 17:25:23 2009 @@ -0,0 +1,200 @@ +package org.apache.activemq.broker; + +import java.net.URI; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq.Connection; +import org.apache.activemq.dispatch.IDispatcher.DispatchContext; +import org.apache.activemq.dispatch.IDispatcher.Dispatchable; +import org.apache.activemq.flow.IFlowController; +import org.apache.activemq.flow.IFlowSink; +import org.apache.activemq.flow.ISinkController; +import org.apache.activemq.flow.ISinkController.FlowUnblockListener; +import org.apache.activemq.metric.MetricAggregator; +import org.apache.activemq.metric.MetricCounter; +import org.apache.activemq.transport.TransportFactory; + +abstract public class RemoteProducer extends Connection implements Dispatchable, FlowUnblockListener { + + protected final MetricCounter rate = new MetricCounter(); + + protected AtomicLong messageIdGenerator; + protected int priority; + protected int priorityMod; + protected int counter; + protected int producerId; + protected Destination destination; + protected String property; + protected MetricAggregator totalProducerRate; + protected MessageDelivery next; + protected DispatchContext dispatchContext; + protected String filler; + protected int payloadSize = 20; + protected URI uri; + + protected IFlowController outboundController; + protected IFlowSink outboundQueue; + + public void start() throws Exception { + + if( payloadSize>0 ) { + StringBuilder sb = new StringBuilder(payloadSize); + for( int i=0; i < payloadSize; ++i) { + sb.append((char)('a'+(i%26))); + } + filler = sb.toString(); + } + + rate.name("Producer " + name + " Rate"); + totalProducerRate.add(rate); + + + transport = TransportFactory.compositeConnect(uri); + initialize(); + super.start(); + + setupProducer(); + + dispatchContext = getDispatcher().register(this, name + "-client"); + dispatchContext.requestDispatch(); + + } + + abstract protected void setupProducer() throws Exception; + + abstract protected void createNextMessage(); + + public void stop() throws Exception + { + dispatchContext.close(false); + super.stop(); + } + + public void onFlowUnblocked(ISinkController controller) { + dispatchContext.requestDispatch(); + } + + public boolean dispatch() { + while(true) + { + + if(next == null) + { + createNextMessage(); + } + + //If flow controlled stop until flow control is lifted. + if(outboundController.isSinkBlocked()) + { + if(outboundController.addUnblockListener(this)) + { + return true; + } + } + + outboundQueue.add(next, null); + rate.increment(); + next = null; + } + } + + protected String createPayload() { + if( payloadSize>=0 ) { + StringBuilder sb = new StringBuilder(payloadSize); + sb.append(name); + sb.append(':'); + sb.append(++counter); + sb.append(':'); + int length = sb.length(); + if( length <= payloadSize ) { + sb.append(filler.subSequence(0, payloadSize-length)); + return sb.toString(); + } else { + return sb.substring(0, payloadSize); + } + } else { + return name+":"+(++counter); + } + } + + public void setName(String name) { + this.name = name; + } + + public AtomicLong getMessageIdGenerator() { + return messageIdGenerator; + } + + public void setMessageIdGenerator(AtomicLong msgIdGenerator) { + this.messageIdGenerator = msgIdGenerator; + } + + public int getPriority() { + return priority; + } + + public void setPriority(int msgPriority) { + this.priority = msgPriority; + } + + public int getPriorityMod() { + return priorityMod; + } + + public void setPriorityMod(int priorityMod) { + this.priorityMod = priorityMod; + } + + public int getProducerId() { + return producerId; + } + + public void setProducerId(int producerId) { + this.producerId = producerId; + } + + public Destination getDestination() { + return destination; + } + + public void setDestination(Destination destination) { + this.destination = destination; + } + + public String getProperty() { + return property; + } + + public void setProperty(String property) { + this.property = property; + } + + public MetricAggregator getTotalProducerRate() { + return totalProducerRate; + } + + public void setTotalProducerRate(MetricAggregator totalProducerRate) { + this.totalProducerRate = totalProducerRate; + } + + public MetricCounter getRate() { + return rate; + } + + public int getPayloadSize() { + return payloadSize; + } + + public void setPayloadSize(int messageSize) { + this.payloadSize = messageSize; + } + + public URI getUri() { + return uri; + } + + public void setUri(URI uri) { + this.uri = uri; + } +} +