activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r781822 [2/2] - in /activemq/sandbox/activemq-flow: activemq-all/ activemq-stomp/ activemq-stomp/src/main/java/org/apache/activemq/broker/ activemq-stomp/src/main/java/org/apache/activemq/transport/ activemq-stomp/src/main/java/org/apache/a...
Date Thu, 04 Jun 2009 18:19:19 GMT
Added: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=781822&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
(added)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
Thu Jun  4 18:19:18 2009
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+
+/**
+ * Keeps track of the STOMP subscription so that acking is correctly done.
+ * 
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+public class StompSubscription {
+
+    public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
+    public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
+    public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
+
+    private final ProtocolConverter protocolConverter;
+    private final String subscriptionId;
+    private final ConsumerInfo consumerInfo;
+
+    private final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new
LinkedHashMap<MessageId, MessageDispatch>();
+    private final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
+
+    private String ackMode = AUTO_ACK;
+    private ActiveMQDestination destination;
+    private String transformation;
+    
+
+    public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo
consumerInfo, String transformation) {
+        this.protocolConverter = stompTransport;
+        this.subscriptionId = subscriptionId;
+        this.consumerInfo = consumerInfo;
+        this.transformation = transformation;
+    }
+
+    void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
+        ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
+        if (ackMode == CLIENT_ACK) {
+            synchronized (this) {
+                dispatchedMessage.put(message.getMessageId(), md);
+            }
+        } else if (ackMode == INDIVIDUAL_ACK) {
+            synchronized (this) {
+                dispatchedMessage.put(message.getMessageId(), md);
+            }
+        } else if (ackMode == AUTO_ACK) {
+            MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
+            protocolConverter.getTransportFilter().sendToActiveMQ(ack);
+        }
+
+        boolean ignoreTransformation = false;
+        
+        if (transformation != null) {
+       		message.setReadOnlyProperties(false);
+        	message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
+        } else {
+        	if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) {
+        		ignoreTransformation = true;
+        	}
+        }
+        
+        StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation);
+
+        command.setAction(Stomp.Responses.MESSAGE);
+        if (subscriptionId != null) {
+            command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
+        }
+        
+        protocolConverter.getTransportFilter().sendToStomp(command);
+    }
+    
+    synchronized void onStompAbort(TransactionId transactionId) throws IOException, JMSException
{
+    	//ack all unacked messages
+    	for (MessageDispatch md : dispatchedMessage.values()) {
+    		if (!unconsumedMessage.contains(md)) {
+    	        MessageAck ack = new MessageAck();
+    	        ack.setDestination(consumerInfo.getDestination());
+    	        ack.setConsumerId(consumerInfo.getConsumerId());
+    	        ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
+    	        ack.setFirstMessageId(md.getMessage().getMessageId());
+    	        ack.setLastMessageId(md.getMessage().getMessageId());
+    	        ack.setMessageCount(1);
+    	        ack.setTransactionId(transactionId);
+    	        protocolConverter.getTransportFilter().sendToActiveMQ(ack);
+    	        unconsumedMessage.add(md);
+    		}
+    	}
+    	// redeliver all unconsumed messages
+    	for (MessageDispatch md : unconsumedMessage) {
+    		onMessageDispatch(md);
+    	}
+    }
+    
+    synchronized void onStompCommit(TransactionId transactionId) {
+    	// ack all messages
+        MessageAck ack = new MessageAck();
+        ack.setDestination(consumerInfo.getDestination());
+        ack.setConsumerId(consumerInfo.getConsumerId());
+        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+        ack.setFirstMessageId(unconsumedMessage.getFirst().getMessage().getMessageId());
+        ack.setLastMessageId(unconsumedMessage.getLast().getMessage().getMessageId());
+        ack.setMessageCount(unconsumedMessage.size());
+        ack.setTransactionId(transactionId);
+        protocolConverter.getTransportFilter().sendToActiveMQ(ack);
+        // clear lists
+    	unconsumedMessage.clear();
+    	dispatchedMessage.clear();
+    }
+
+    synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId)
{
+    	
+    	MessageId msgId = new MessageId(messageId);
+    	
+        if (!dispatchedMessage.containsKey(msgId)) {
+            return null;
+        }
+
+        MessageAck ack = new MessageAck();
+        ack.setDestination(consumerInfo.getDestination());
+        ack.setConsumerId(consumerInfo.getConsumerId());
+
+        if (ackMode == CLIENT_ACK) {
+        	if (transactionId != null) {
+        		ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
+        	} else {
+        		ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+        	}
+            int count = 0;
+            for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();)
{
+
+                Map.Entry entry = (Entry)iter.next();
+                MessageId id = (MessageId)entry.getKey();
+                MessageDispatch msg = (MessageDispatch)entry.getValue();
+
+                if (ack.getFirstMessageId() == null) {
+                    ack.setFirstMessageId(id);
+                }
+                
+                if (transactionId != null) {
+                	if (!unconsumedMessage.contains(msg))
+                		unconsumedMessage.add(msg);
+                } else {
+                	iter.remove();
+                }
+                
+                
+                count++;
+
+                if (id.equals(msgId)) {
+                    ack.setLastMessageId(id);
+                    break;
+                }
+
+            }
+            ack.setMessageCount(count);
+            if (transactionId != null) {
+            	ack.setTransactionId(transactionId);
+            }
+        }
+        else if (ackMode == INDIVIDUAL_ACK) {
+            ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
+            ack.setMessageID(msgId);
+            if (transactionId != null) {
+            	unconsumedMessage.add(dispatchedMessage.get(msgId));
+            	ack.setTransactionId(transactionId);
+            } 
+            dispatchedMessage.remove(messageId);
+        }
+        return ack;
+    }
+
+    public String getAckMode() {
+        return ackMode;
+    }
+
+    public void setAckMode(String ackMode) {
+        this.ackMode = ackMode;
+    }
+
+    public String getSubscriptionId() {
+        return subscriptionId;
+    }
+
+    public void setDestination(ActiveMQDestination destination) {
+        this.destination = destination;
+    }
+
+    public ActiveMQDestination getDestination() {
+        return destination;
+    }
+
+    public ConsumerInfo getConsumerInfo() {
+        return consumerInfo;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java?rev=781822&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
(added)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
Thu Jun  4 18:19:18 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.util.Map;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.xbean.XBeanBrokerService;
+import org.springframework.context.ApplicationContext;
+
+/**
+ * A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class StompTransportFactory extends TcpTransportFactory implements BrokerServiceAware
{
+
+	private ApplicationContext applicationContext = null;
+	
+    protected String getDefaultWireFormatType() {
+        return "stomp";
+    }
+
+    public Transport compositeConfigure(Transport transport, WireFormat format, Map options)
{
+        transport = new StompTransportFilter(transport, new LegacyFrameTranslator(), applicationContext);
+        IntrospectionSupport.setProperties(transport, options);
+        return super.compositeConfigure(transport, format, options);
+    }
+
+    protected boolean isUseInactivityMonitor(Transport transport) {
+        // lets disable the inactivity monitor as stomp does not use keep alive
+        // packets
+        return false;
+    }
+
+	public void setBrokerService(BrokerService brokerService) {
+		if (brokerService instanceof XBeanBrokerService) {
+			this.applicationContext = ((XBeanBrokerService)brokerService).getApplicationContext();
+		}
+	}
+}

Added: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java?rev=781822&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
(added)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
Thu Jun  4 18:19:18 2009
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.context.ApplicationContext;
+
+/**
+ * The StompTransportFilter normally sits on top of a TcpTransport that has been
+ * configured with the StompWireFormat and is used to convert STOMP commands to
+ * ActiveMQ commands. All of the conversion work is done by delegating to the
+ * ProtocolConverter.
+ * 
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+public class StompTransportFilter extends TransportFilter {
+    private static final Log LOG = LogFactory.getLog(StompTransportFilter.class);
+    private final ProtocolConverter protocolConverter;
+    private final FrameTranslator frameTranslator;
+
+    private boolean trace;
+
+    public StompTransportFilter(Transport next, FrameTranslator translator, ApplicationContext
applicationContext) {
+        super(next);
+        this.frameTranslator = translator;
+        this.protocolConverter = new ProtocolConverter(this, translator, applicationContext);
+    }
+
+    public void oneway(Object o) throws IOException {
+        try {
+            final Command command = (Command)o;
+            protocolConverter.onActiveMQCommand(command);
+        } catch (JMSException e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    public void onCommand(Object command) {
+        try {
+            if (trace) {
+                LOG.trace("Received: \n" + command);
+            }
+            protocolConverter.onStompCommand((StompFrame)command);
+        } catch (IOException e) {
+            onException(e);
+        } catch (JMSException e) {
+            onException(IOExceptionSupport.create(e));
+        }
+    }
+
+    public void sendToActiveMQ(Command command) {
+        TransportListener l = transportListener;
+        if (l!=null) {
+            l.onCommand(command);
+        }
+    }
+
+    public void sendToStomp(StompFrame command) throws IOException {
+        if (trace) {
+            LOG.trace("Sending: \n" + command);
+        }
+        Transport n = next;
+        if (n!=null) {
+            n.oneway(command);
+        }
+    }
+
+    public FrameTranslator getFrameTranslator() {
+        return frameTranslator;
+    }
+
+    public boolean isTrace() {
+        return trace;
+    }
+
+    public void setTrace(boolean trace) {
+        this.trace = trace;
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java?rev=781822&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
(added)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
Thu Jun  4 18:19:18 2009
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.activemq.transport.InactivityMonitor;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * Implements marshalling and unmarsalling the <a
+ * href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ */
+public class StompWireFormat implements WireFormat {
+
+    private static final byte[] NO_DATA = new byte[] {};
+    private static final byte[] END_OF_FRAME = new byte[] {0, '\n'};
+
+    private static final int MAX_COMMAND_LENGTH = 1024;
+    private static final int MAX_HEADER_LENGTH = 1024 * 10;
+    private static final int MAX_HEADERS = 1000;
+    private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+
+    private int version = 1;
+
+    public ByteSequence marshal(Object command) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        marshal(command, dos);
+        dos.close();
+        return baos.toByteSequence();
+    }
+
+    public Object unmarshal(ByteSequence packet) throws IOException {
+        ByteArrayInputStream stream = new ByteArrayInputStream(packet);
+        DataInputStream dis = new DataInputStream(stream);
+        return unmarshal(dis);
+    }
+
+    public void marshal(Object command, DataOutput os) throws IOException {
+        StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame)command;
+
+        StringBuffer buffer = new StringBuffer();
+        buffer.append(stomp.getAction());
+        buffer.append(Stomp.NEWLINE);
+
+        // Output the headers.
+        for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
+            Map.Entry entry = (Map.Entry)iter.next();
+            buffer.append(entry.getKey());
+            buffer.append(Stomp.Headers.SEPERATOR);
+            buffer.append(entry.getValue());
+            buffer.append(Stomp.NEWLINE);
+        }
+
+        // Add a newline to seperate the headers from the content.
+        buffer.append(Stomp.NEWLINE);
+
+        os.write(buffer.toString().getBytes("UTF-8"));
+        os.write(stomp.getContent());
+        os.write(END_OF_FRAME);
+    }
+
+    public Object unmarshal(DataInput in) throws IOException {
+
+        try {
+            String action = null;
+
+            // skip white space to next real action line
+            while (true) {
+                action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was
exceeded");
+                if (action == null) {
+                    throw new IOException("connection was closed");
+                } else {
+                    action = action.trim();
+                    if (action.length() > 0) {
+                        break;
+                    }
+                }
+            }
+
+            // Parse the headers
+            HashMap<String, String> headers = new HashMap<String, String>(25);
+            while (true) {
+                String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length
was exceeded");
+                if (line != null && line.trim().length() > 0) {
+
+                    if (headers.size() > MAX_HEADERS) {
+                        throw new ProtocolException("The maximum number of headers was exceeded",
true);
+                    }
+
+                    try {
+                        int seperatorIndex = line.indexOf(Stomp.Headers.SEPERATOR);
+                        String name = line.substring(0, seperatorIndex).trim();
+                        String value = line.substring(seperatorIndex + 1, line.length()).trim();
+                        headers.put(name, value);
+                    } catch (Exception e) {
+                        throw new ProtocolException("Unable to parser header line [" + line
+ "]", true);
+                    }
+                } else {
+                    break;
+                }
+            }
+
+            // Read in the data part.
+            byte[] data = NO_DATA;
+            String contentLength = headers.get(Stomp.Headers.CONTENT_LENGTH);
+            if (contentLength != null) {
+
+                // Bless the client, he's telling us how much data to read in.
+                int length;
+                try {
+                    length = Integer.parseInt(contentLength.trim());
+                } catch (NumberFormatException e) {
+                    throw new ProtocolException("Specified content-length is not a valid
integer", true);
+                }
+
+                if (length > MAX_DATA_LENGTH) {
+                    throw new ProtocolException("The maximum data length was exceeded", true);
+                }
+
+                data = new byte[length];
+                in.readFully(data);
+
+                if (in.readByte() != 0) {
+                    throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH + " bytes were
read and " + "there was no trailing null byte", true);
+                }
+
+            } else {
+
+                // We don't know how much to read.. data ends when we hit a 0
+                byte b;
+                ByteArrayOutputStream baos = null;
+                while ((b = in.readByte()) != 0) {
+
+                    if (baos == null) {
+                        baos = new ByteArrayOutputStream();
+                    } else if (baos.size() > MAX_DATA_LENGTH) {
+                        throw new ProtocolException("The maximum data length was exceeded",
true);
+                    }
+
+                    baos.write(b);
+                }
+
+                if (baos != null) {
+                    baos.close();
+                    data = baos.toByteArray();
+                }
+
+            }
+
+            return new StompFrame(action, headers, data);
+
+        } catch (ProtocolException e) {
+            return new StompFrameError(e);
+        }
+
+    }
+
+    private String readLine(DataInput in, int maxLength, String errorMessage) throws IOException
{
+        byte b;
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
+        while ((b = in.readByte()) != '\n') {
+            if (baos.size() > maxLength) {
+                throw new ProtocolException(errorMessage, true);
+            }
+            baos.write(b);
+        }
+        baos.close();
+        ByteSequence sequence = baos.toByteSequence();
+        return new String(sequence.getData(), sequence.getOffset(), sequence.getLength(),
"UTF-8");
+    }
+
+    public int getVersion() {
+        return version;
+    }
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+
+	public boolean inReceive() {
+		//TODO implement the inactivity monitor
+		return false;
+	}
+
+    public Transport createTransportFilters(Transport transport, Map options) {
+        if (transport.isUseInactivityMonitor()) {
+            transport = new InactivityMonitor(transport, this);
+        }
+        return transport;
+    }
+    
+    
+
+}

Added: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java?rev=781822&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java
(added)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java
Thu Jun  4 18:19:18 2009
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+/**
+ * Creates WireFormat objects that marshalls the <a href="http://stomp.codehaus.org/">Stomp</a>
protocol.
+ */
+public class StompWireFormatFactory implements WireFormatFactory {
+    public WireFormat createWireFormat() {
+        return new StompWireFormat();
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/package.html?rev=781822&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/package.html
(added)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/package.html
Thu Jun  4 18:19:18 2009
@@ -0,0 +1,26 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+An implementation of the Stomp protocol which is a simple wire protocol for writing clients
for ActiveMQ in different
+languages like Ruby, Python, PHP, C etc.
+
+</body>
+</html>

Added: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java?rev=781822&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java
(added)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java
Thu Jun  4 18:19:18 2009
@@ -0,0 +1,13 @@
+package org.apache.activemq.xbean;
+
+import org.apache.activemq.broker.BrokerService;
+import org.springframework.context.ApplicationContext;
+
+public class XBeanBrokerService extends BrokerService {
+
+    public ApplicationContext getApplicationContext() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+}



Mime
View raw message