activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r358217 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/activemq/transport/stomp/ main/resources/META-INF/services/org/activemq/transport/ main/resources/META-INF/services/org/activemq/wireformat/ test/java/org/activemq/tran...
Date Wed, 21 Dec 2005 05:45:20 GMT
Author: chirino
Date: Tue Dec 20 21:45:09 2005
New Revision: 358217

URL: http://svn.apache.org/viewcvs?rev=358217&view=rev
Log:
Get the stomp wireformat working again and added some more tests to verify that it is working.
The stomp transport factory just reuses the tcp transport and sets the wireformat to be stomp.

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Connect.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompTransportFactory.java
      - copied, changed from r358098, incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompTransportServerChannelFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompWireFormatFactory.java
    incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/stomp
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/activemq/wireformat/stomp
  (with props)
    incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/stomp/StompTest.java
Removed:
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompTransportChannel.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompTransportServerChannel.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompTransportServerChannelFactory.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/CommandEnvelope.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/CommandParser.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Send.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Stomp.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompWireFormat.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/stomp/StompWireFormatTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/CommandEnvelope.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/CommandEnvelope.java?rev=358217&r1=358216&r2=358217&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/CommandEnvelope.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/CommandEnvelope.java
Tue Dec 20 21:45:09 2005
@@ -8,19 +8,31 @@
 import java.util.Properties;
 
 public class CommandEnvelope {
+    
     private final Command command;
     private final Properties headers;
-
-    CommandEnvelope(Command command, Properties headers) {
+    private final ResponseListener responseListener;
+    
+    public CommandEnvelope(Command command, Properties headers) {
+        this(command, headers, null);
+    }
+    
+    public CommandEnvelope(Command command, Properties headers, ResponseListener responseListener)
{
         this.command = command;
         this.headers = headers;
+        this.responseListener = responseListener;
     }
 
-    Properties getHeaders() {
+    public Properties getHeaders() {
         return headers;
     }
 
-    Command getCommand() {
+    public Command getCommand() {
         return command;
     }
+
+    public ResponseListener getResponseListener() {
+        return responseListener;
+    }
+    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/CommandParser.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/CommandParser.java?rev=358217&r1=358216&r2=358217&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/CommandParser.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/CommandParser.java
Tue Dec 20 21:45:09 2005
@@ -14,7 +14,6 @@
 import java.net.ProtocolException;
 
 class CommandParser {
-    private String clientId;
     private final StompWireFormat format;
 
     CommandParser(StompWireFormat wireFormat) {
@@ -33,8 +32,10 @@
             throw new IOException("connection was closed");
         }
 
-        // figure corrent command and return it
+        // figure correct command and return it
         StompCommand command = null;
+        if (line.startsWith(Stomp.Commands.CONNECT))
+            command = new Connect(format);
         if (line.startsWith(Stomp.Commands.SUBSCRIBE))
             command = new Subscribe(format);
         if (line.startsWith(Stomp.Commands.SEND))
@@ -58,27 +59,28 @@
             throw new ProtocolException("Unknown command [" + line + "]");
         }
 
-        CommandEnvelope envelope = command.build(line, in);
-        if (envelope.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED)) {
-            final short id = StompWireFormat.generateCommandId();
-            envelope.getCommand().setCommandId(id);
+        final CommandEnvelope envelope = command.build(line, in);
+        final short commandId = format.generateCommandId();
+        final String client_packet_key = envelope.getHeaders().getProperty(Stomp.Headers.RECEIPT_REQUESTED);
+        final boolean receiptRequested = client_packet_key!=null;
+        
+        envelope.getCommand().setCommandId(commandId);
+        if (receiptRequested || envelope.getResponseListener()!=null ) {
             envelope.getCommand().setResponseRequired(true);
-            final String client_packet_key = envelope.getHeaders().getProperty(Stomp.Headers.RECEIPT_REQUESTED);
-            format.addResponseListener(new ResponseListener() {
-                public boolean onResponse(Response receipt, DataOutput out) throws IOException
{
-                    if (receipt.getCorrelationId() != id)
-                        return false;
-
-                    out.write(new FrameBuilder(Stomp.Responses.RECEIPT).addHeader(Stomp.Headers.Response.RECEIPT_ID,
client_packet_key).toFrame());
-                    return true;
-                }
-            });
+            if( envelope.getResponseListener()!=null ) {
+                format.addResponseListener(envelope.getResponseListener());
+            } else {
+                format.addResponseListener(new ResponseListener() {
+                    public boolean onResponse(Response receipt, DataOutput out) throws IOException
{
+                        if (receipt.getCorrelationId() != commandId)
+                            return false;
+                        out.write(new FrameBuilder(Stomp.Responses.RECEIPT).addHeader(Stomp.Headers.Response.RECEIPT_ID,
client_packet_key).toFrame());
+                        return true;
+                    }
+                });
+            }
         }
 
         return envelope.getCommand();
-    }
-
-    void setClientId(String clientId) {
-        this.clientId = clientId;
     }
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Connect.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Connect.java?rev=358217&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Connect.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Connect.java
Tue Dec 20 21:45:09 2005
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2005 Your Corporation. All Rights Reserved.
+ */
+package org.activemq.transport.stomp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.activemq.command.ConnectionInfo;
+import org.activemq.command.ProducerInfo;
+import org.activemq.command.Response;
+import org.activemq.command.SessionInfo;
+
+class Connect implements StompCommand {
+    private HeaderParser headerParser = new HeaderParser();
+    private StompWireFormat format;
+
+    Connect(StompWireFormat format) {
+        this.format = format;
+    }
+
+    public CommandEnvelope build(String commandLine, DataInput in) throws IOException {
+        
+        Properties headers = headerParser.parse(in);
+        
+        
+        // allow anyone to login for now
+        String login = headers.getProperty(Stomp.Headers.Connect.LOGIN);
+        String passcode = headers.getProperty(Stomp.Headers.Connect.PASSCODE);
+        String clientId = headers.getProperty(Stomp.Headers.Connect.CLIENT_ID);
+        
+        final ConnectionInfo connectionInfo = new ConnectionInfo();
+        connectionInfo.setConnectionId(format.getConnectionId());
+        if( clientId!=null )
+            connectionInfo.setClientId(clientId);
+        else
+            connectionInfo.setClientId(""+connectionInfo.getConnectionId().toString());
+        connectionInfo.setResponseRequired(true);
+        connectionInfo.setUserName(login);
+        connectionInfo.setPassword(passcode);
+
+        while (in.readByte() != 0) {
+        }
+        
+        return new CommandEnvelope(connectionInfo, headers, new ResponseListener() {
+            public boolean onResponse(Response receipt, DataOutput out) throws IOException
{
+                
+                if (receipt.getCorrelationId() != connectionInfo.getCommandId())
+                    return false;
+                
+                final SessionInfo sessionInfo = new SessionInfo(format.getSessionId());
+                sessionInfo.setCommandId(format.generateCommandId());
+                sessionInfo.setResponseRequired(false);
+                
+                final ProducerInfo producerInfo = new ProducerInfo(format.getProducerId());
+                producerInfo.setCommandId(format.generateCommandId());
+                producerInfo.setResponseRequired(true);
+                
+                format.addResponseListener(new ResponseListener() {
+                    public boolean onResponse(Response receipt, DataOutput out) throws IOException
{
+                        if (receipt.getCorrelationId() != producerInfo.getCommandId())
+                            return false;
+                        
+                        format.onFullyConnected();
+                        
+                        StringBuffer buffer = new StringBuffer();
+                        buffer.append(Stomp.Responses.CONNECTED).append(Stomp.NEWLINE);
+                        buffer.append(Stomp.Headers.Connected.SESSION).append(Stomp.Headers.SEPERATOR).append(connectionInfo.getClientId()).append(Stomp.NEWLINE).append(
+                                Stomp.NEWLINE);
+                        buffer.append(Stomp.NULL);
+                        out.writeBytes(buffer.toString());
+                        return true;
+                    }
+                });
+
+                format.addToPendingReadCommands(sessionInfo);
+                format.addToPendingReadCommands(producerInfo);
+                return true;
+            }
+        });
+    }
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Send.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Send.java?rev=358217&r1=358216&r2=358217&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Send.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Send.java
Tue Dec 20 21:45:09 2005
@@ -68,7 +68,9 @@
             msg = text;
         }
 
-        msg.setMessageId(format.generateMessageId());
+        msg.setProducerId(format.getProducerId());
+        msg.setMessageId(format.createMessageId());
+        
 
         ActiveMQDestination d = DestinationNamer.convert(destination);
         msg.setDestination(d);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Stomp.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Stomp.java?rev=358217&r1=358216&r2=358217&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Stomp.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Stomp.java
Tue Dec 20 21:45:09 2005
@@ -80,6 +80,7 @@
         public interface Connect {
             String LOGIN = "login";
             String PASSCODE = "passcode";
+            String CLIENT_ID = "client-id";
         }
 
         public interface Error {

Copied: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompTransportFactory.java
(from r358098, incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompTransportServerChannelFactory.java)
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompTransportFactory.java?p2=incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompTransportFactory.java&p1=incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompTransportServerChannelFactory.java&r1=358098&r2=358217&rev=358217&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompTransportServerChannelFactory.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompTransportFactory.java
Tue Dec 20 21:45:09 2005
@@ -17,21 +17,32 @@
  **/
 package org.activemq.transport.stomp;
 
-import org.activemq.transport.TransportFactory;
-import org.activemq.transport.TransportServer;
-
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.activemq.transport.TransportFactory;
+import org.activemq.transport.TransportServer;
+import org.activemq.util.IOExceptionSupport;
 
 /**
  * A <a href="http://stomp.codehaus.org/">Stomp</a> transport factory
  * 
  * @version $Revision: 1.1.1.1 $
  */
-public class StompTransportServerChannelFactory extends TransportFactory {
+public class StompTransportFactory extends TransportFactory {
 
     public TransportServer doBind(String brokerId, URI location) throws IOException {
-        return new StompTransportServerChannel(brokerId, location);
+        try {
+            URI tcpURI = new URI(
+                    "tcp://"+location.getHost()+
+                    (location.getPort()>=0 ? ":"+location.getPort() : "")+
+                    "?wireFormat=stomp"
+                    );
+            return TransportFactory.bind(brokerId, tcpURI);
+        } catch (URISyntaxException e) {
+            throw IOExceptionSupport.create(e);
+        }
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompWireFormat.java?rev=358217&r1=358216&r2=358217&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompWireFormat.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompWireFormat.java
Tue Dec 20 21:45:09 2005
@@ -3,75 +3,80 @@
  */
 package org.activemq.transport.stomp;
 
-import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
-import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.ProtocolException;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.JMSException;
 
+import org.activeio.ByteArrayOutputStream;
 import org.activeio.Packet;
+import org.activeio.adapter.PacketInputStream;
 import org.activeio.command.WireFormat;
+import org.activeio.packet.ByteArrayPacket;
+import org.activemq.command.ActiveMQBytesMessage;
 import org.activemq.command.ActiveMQDestination;
 import org.activemq.command.ActiveMQTextMessage;
+import org.activemq.command.Command;
 import org.activemq.command.CommandTypes;
 import org.activemq.command.ConnectionId;
 import org.activemq.command.ConnectionInfo;
 import org.activemq.command.ConsumerId;
-import org.activemq.command.Command;
 import org.activemq.command.FlushCommand;
 import org.activemq.command.LocalTransactionId;
 import org.activemq.command.MessageId;
+import org.activemq.command.ProducerId;
 import org.activemq.command.Response;
 import org.activemq.command.SessionId;
-import org.activemq.command.SessionInfo;
-import org.activemq.command.ActiveMQBytesMessage;
 import org.activemq.command.TransactionId;
+import org.activemq.util.IOExceptionSupport;
 import org.activemq.util.IdGenerator;
+import org.activemq.util.LongSequenceGenerator;
 
-import javax.jms.JMSException;
-import java.io.BufferedReader;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.DatagramPacket;
-import java.net.ProtocolException;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
 
 /**
  * Implements the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
  */
 public class StompWireFormat implements WireFormat {
 
-    static final IdGenerator PACKET_IDS = new IdGenerator();
-    static final IdGenerator clientIds = new IdGenerator();
+    private static final IdGenerator connectionIdGenerator = new IdGenerator();
     private static int transactionIdCounter;
 
     private int version = 1;
     private CommandParser commandParser = new CommandParser(this);
     private HeaderParser headerParser = new HeaderParser();
 
-    private DataInputStream in;
-
-    private String clientId;
-
     private BlockingQueue pendingReadCommands = new LinkedBlockingQueue();
     private BlockingQueue pendingWriteFrames = new LinkedBlockingQueue();
     private List receiptListeners = new CopyOnWriteArrayList();
-    private SessionId sessionId;
     private Map subscriptions = new ConcurrentHashMap();
     private List ackListeners = new CopyOnWriteArrayList();
     private final Map transactions = new ConcurrentHashMap();
-    private ConnectionId connectionId;
+    private short lastCommandId;
 
+    private final ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
+    private final SessionId sessionId = new SessionId(connectionId, -1);
+    private final ProducerId producerId = new ProducerId(sessionId, 1);
+    
+    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
+    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
+    
     void addResponseListener(ResponseListener listener) {
         receiptListeners.add(listener);
     }
 
+    boolean connected = false;
+    
     public Command readCommand(DataInput in) throws IOException, JMSException {
         Command pending = (Command) AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.HelperWithReturn()
{
             public Object cycle() throws InterruptedException {
@@ -83,7 +88,12 @@
         }
 
         try {
-            return commandParser.parse(in);
+            Command command = commandParser.parse(in);
+            if( !connected ) {
+                if( command.getDataStructureType() != ConnectionInfo.DATA_STRUCTURE_TYPE
)
+                    throw new IOException("Not yet connected.");
+            }
+            return command;
         }
         catch (ProtocolException e) {
             sendError(e.getMessage());
@@ -149,265 +159,32 @@
             }
         });
     }
-
-    /**
-     * some transports may register their streams (e.g. Tcp)
-     * 
-     * @param dataOut
-     * @param dataIn
-     */
-    public void registerTransportStreams(DataOutputStream dataOut, DataInputStream dataIn)
{
-        this.in = dataIn;
-    }
-
-    /**
-     * Some wire formats require a handshake at start-up
-     * 
-     * @throws java.io.IOException
-     */
-    public void initiateServerSideProtocol() throws IOException {
-        BufferedReader in = new BufferedReader(new InputStreamReader(this.in));
-        String first_line = in.readLine();
-        if (!first_line.startsWith(Stomp.Commands.CONNECT)) {
-            throw new IOException("First line does not begin with with " + Stomp.Commands.CONNECT);
-        }
-
-        Properties headers = headerParser.parse(in);
-        // if (!headers.containsKey(TTMP.Headers.Connect.LOGIN))
-        // System.err.println("Required header [" + TTMP.Headers.Connect.LOGIN +
-        // "] missing");
-        // if (!headers.containsKey(TTMP.Headers.Connect.PASSCODE))
-        // System.err.println("Required header [" +
-        // TTMP.Headers.Connect.PASSCODE + "] missing");
-
-        // allow anyone to login for now
-
-        String login = headers.getProperty(Stomp.Headers.Connect.LOGIN);
-        String passcode = headers.getProperty(Stomp.Headers.Connect.PASSCODE);
-
-        // skip to end of the packet
-        while (in.read() != 0) {
-        }
-        final ConnectionInfo info = new ConnectionInfo();
-        clientId = clientIds.generateId();
-        commandParser.setClientId(clientId);
-
-        info.setClientId(clientId);
-        info.setResponseRequired(true);
-        // info.setClientVersion(Integer.toString(getCurrentWireFormatVersion()));
-        final short commandId = generateCommandId();
-        info.setCommandId(commandId);
-        info.setUserName(login);
-        info.setPassword(passcode);
-        // info.setStartTime(System.currentTimeMillis());
+    
+    public void onFullyConnected() {
+        connected=true;
+    }
+    
+    public void addToPendingReadCommands(final Command info) {
         AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper() {
             public void cycle() throws InterruptedException {
                 pendingReadCommands.put(info);
             }
         });
-
-        addResponseListener(new ResponseListener() {
-            public boolean onResponse(Response receipt, DataOutput out) {
-                if (receipt.getCorrelationId() != commandId)
-                    return false;
-                sessionId = generateSessionId();
-
-                final SessionInfo info = new SessionInfo();
-                info.setCommandId(generateCommandId());
-                info.setSessionId(sessionId);
-                info.setResponseRequired(true);
-
-                AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper() {
-                    public void cycle() throws InterruptedException {
-                        pendingReadCommands.put(info);
-                    }
-                });
-
-                addResponseListener(new ResponseListener() {
-                    public boolean onResponse(Response receipt, DataOutput out) throws IOException
{
-                        if (receipt.getCorrelationId() != commandId)
-                            return false;
-                        StringBuffer buffer = new StringBuffer();
-                        buffer.append(Stomp.Responses.CONNECTED).append(Stomp.NEWLINE);
-                        buffer.append(Stomp.Headers.Connected.SESSION).append(Stomp.Headers.SEPERATOR).append(clientId).append(Stomp.NEWLINE).append(
-                                Stomp.NEWLINE);
-                        buffer.append(Stomp.NULL);
-                        out.writeBytes(buffer.toString());
-                        return true;
-                    }
-                });
-
-                return true;
-            }
-        });
     }
 
-    /**
-     * Creates a new copy of this wire format so it can be used in another
-     * thread/context
-     */
-    public WireFormat copy() {
-        return new StompWireFormat();
-    }
-
-    /* Stuff below here is leaky stuff we don't actually need */
-
-    /**
-     * Some wire formats require a handshake at start-up
-     * 
-     * @throws java.io.IOException
-     */
-    public void initiateClientSideProtocol() throws IOException {
-        throw new UnsupportedOperationException("Not yet implemented!");
-    }
-
-    /**
-     * Can this wireformat process packets of this version
-     * 
-     * @param version
-     *            the version number to test
-     * @return true if can accept the version
-     */
-    public boolean canProcessWireFormatVersion(int version) {
-        return version == getCurrentWireFormatVersion();
-    }
-
-    /**
-     * @return the current version of this wire format
-     */
-    public int getCurrentWireFormatVersion() {
-        return 1;
-    }
-
-    /**
-     * @return Returns the enableCaching.
-     */
-    public boolean isCachingEnabled() {
-        return false;
-    }
-
-    /**
-     * @param enableCaching
-     *            The enableCaching to set.
-     */
-    public void setCachingEnabled(boolean enableCaching) {
-        // never
-    }
-
-    /**
-     * some wire formats will implement their own fragementation
-     * 
-     * @return true unless a wire format supports it's own fragmentation
-     */
-    public boolean doesSupportMessageFragmentation() {
-        return false;
-    }
-
-    /**
-     * Some wire formats will not be able to understand compressed messages
-     * 
-     * @return true unless a wire format cannot understand compression
-     */
-    public boolean doesSupportMessageCompression() {
-        return false;
-    }
-
-    /**
-     * Writes the given package to a new datagram
-     * 
-     * @param channelID
-     *            is the unique channel ID
-     * @param packet
-     *            is the packet to write
-     * @return
-     * @throws java.io.IOException
-     * @throws javax.jms.JMSException
-     */
-    public DatagramPacket writeCommand(String channelID, Command packet) throws IOException,
JMSException {
-        throw new UnsupportedOperationException("Will not be implemented");
-    }
-
-    /**
-     * Reads the packet from the given byte[]
-     * 
-     * @param bytes
-     * @param offset
-     * @param length
-     * @return
-     * @throws java.io.IOException
-     */
-    public Command fromBytes(byte[] bytes, int offset, int length) throws IOException {
-        throw new UnsupportedOperationException("Will not be implemented");
-    }
-
-    /**
-     * Reads the packet from the given byte[]
-     * 
-     * @param bytes
-     * @return
-     * @throws java.io.IOException
-     */
-    public Command fromBytes(byte[] bytes) throws IOException {
-        throw new UnsupportedOperationException("Will not be implemented");
-    }
-
-    /**
-     * A helper method which converts a packet into a byte array
-     * 
-     * @param packet
-     * @return a byte array representing the packet using some wire protocol
-     * @throws java.io.IOException
-     * @throws javax.jms.JMSException
-     */
-    public byte[] toBytes(Command packet) throws IOException, JMSException {
-        throw new UnsupportedOperationException("Will not be implemented");
-    }
-
-    /**
-     * A helper method for working with sockets where the first byte is read
-     * first, then the rest of the message is read. <p/> Its common when dealing
-     * with sockets to have different timeout semantics until the first non-zero
-     * byte is read of a message, after which time a zero timeout is used.
-     * 
-     * @param firstByte
-     *            the first byte of the packet
-     * @param in
-     *            the rest of the packet
-     * @return
-     * @throws java.io.IOException
-     */
-    public Command readCommand(int firstByte, DataInput in) throws IOException {
-        throw new UnsupportedOperationException("Will not be implemented");
-    }
-
-    /**
-     * Read a packet from a Datagram packet from the given channelID. If the
-     * packet is from the same channel ID as it was sent then we have a
-     * loop-back so discard the packet
-     * 
-     * @param channelID
-     *            is the unique channel ID
-     * @param dpacket
-     * @return the packet read from the datagram or null if it should be
-     *         discarded
-     * @throws java.io.IOException
-     */
-    public Command readCommand(String channelID, DatagramPacket dpacket) throws IOException
{
-        throw new UnsupportedOperationException("Will not be implemented");
-    }
 
     void clearTransactionId(String user_tx_id) {
         this.transactions.remove(user_tx_id);
     }
 
-    String getClientId() {
-        return this.clientId;
-    }
-
     public SessionId getSessionId() {
         return sessionId;
     }
 
+    public ProducerId getProducerId() {
+        return producerId;
+    }
+    
     public void addSubscription(Subscription s) {
         if (subscriptions.containsKey(s.getDestination())) {
             Subscription old = (Subscription) subscriptions.get(s.getDestination());
@@ -462,44 +239,58 @@
         return connectionId;
     }
 
-    public void setConnectionId(ConnectionId connectionId) {
-        this.connectionId = connectionId;
-    }
-
     public static synchronized int generateTransactionId() {
         return ++transactionIdCounter;
     }
 
     public ConsumerId createConsumerId() {
-        throw new RuntimeException("TODO!!");
-    }
-
-    public MessageId generateMessageId() {
-        throw new RuntimeException("TODO!!");
+        return new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
     }
-
-    // TODO static???
-    public static short generateCommandId() {
-        throw new RuntimeException("TODO!!");
+    
+    public MessageId createMessageId() {
+        return new MessageId(producerId, messageIdGenerator.getNextSequenceId());
+    }
+    
+    synchronized public short generateCommandId() {
+        return lastCommandId++;
     }
 
     public SessionId generateSessionId() {
         throw new RuntimeException("TODO!!");
     }
 
-    public Packet marshal(Object arg0) throws IOException {
-        throw new RuntimeException("TODO!!");
+    public Packet marshal(Object command) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        marshal(command, dos);
+        dos.close();
+        return new ByteArrayPacket(baos.toByteSequence());
     }
 
-    public Object unmarshal(Packet arg0) throws IOException {
-        throw new RuntimeException("TODO!!");
+    public Object unmarshal(Packet packet) throws IOException {
+        PacketInputStream stream = new PacketInputStream(packet);
+        DataInputStream dis = new DataInputStream(stream);
+        return unmarshal(dis);
     }
 
-    public void marshal(Object arg0, DataOutputStream arg1) throws IOException {
-        throw new RuntimeException("TODO!!");
+    public void marshal(Object command, DataOutputStream os) throws IOException {
+        try {
+            writeCommand((Command) command, os);
+        } catch (IOException e) {
+            throw e;
+        } catch (JMSException e) {
+            throw IOExceptionSupport.create(e);
+        }
     }
 
-    public Object unmarshal(DataInputStream arg0) throws IOException {
-        throw new RuntimeException("TODO!!");
+    public Object unmarshal(DataInputStream is) throws IOException {
+        try {
+            return readCommand(is);
+        } catch (IOException e) {
+            throw e;
+        } catch (JMSException e) {
+            throw IOExceptionSupport.create(e);
+        }
     }
+
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompWireFormatFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompWireFormatFactory.java?rev=358217&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompWireFormatFactory.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompWireFormatFactory.java
Tue Dec 20 21:45:09 2005
@@ -0,0 +1,16 @@
+/*
+ * Copyright (c) 2005 Your Corporation. All Rights Reserved.
+ */
+package org.activemq.transport.stomp;
+
+import org.activeio.command.WireFormat;
+import org.activeio.command.WireFormatFactory;
+
+/**
+ * Creates WireFormat objects that implement the <a href="http://stomp.codehaus.org/">Stomp</a>
protocol.
+ */
+public class StompWireFormatFactory implements WireFormatFactory {
+    public WireFormat createWireFormat() {
+        return new StompWireFormat();
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/stomp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/stomp?rev=358217&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/stomp
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/stomp
Tue Dec 20 21:45:09 2005
@@ -0,0 +1 @@
+class=org.activemq.transport.stomp.StompTransportFactory

Propchange: incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/stomp
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/activemq/wireformat/stomp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/activemq/wireformat/stomp?rev=358217&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/activemq/wireformat/stomp
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/activemq/wireformat/stomp
Tue Dec 20 21:45:09 2005
@@ -0,0 +1 @@
+class=org.activemq.transport.stomp.StompWireFormatFactory
\ No newline at end of file

Propchange: incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/activemq/wireformat/stomp
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/stomp/StompTest.java?rev=358217&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/stomp/StompTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/stomp/StompTest.java
Tue Dec 20 21:45:09 2005
@@ -0,0 +1,121 @@
+package org.activemq.transport.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.activemq.ActiveMQConnectionFactory;
+import org.activemq.CombinationTestSupport;
+import org.activemq.broker.BrokerService;
+import org.activemq.broker.TransportConnector;
+import org.activemq.command.ActiveMQDestination;
+import org.activemq.command.ActiveMQQueue;
+
+public class StompTest extends CombinationTestSupport {
+
+    private BrokerService broker;
+    private TransportConnector connector;
+    private Socket stompSocket;
+    private ByteArrayOutputStream inputBuffer;
+    private Connection connection;
+    private Session session;
+    private ActiveMQQueue queue;
+
+    protected void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        connector = broker.addConnector("stomp://localhost:0");
+        broker.start();
+        
+        URI connectUri = connector.getConnectUri();
+        stompSocket = new Socket(connectUri.getHost(), connectUri.getPort());
+        inputBuffer = new ByteArrayOutputStream();
+        
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
+        connection = cf.createConnection();
+        session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        queue = new ActiveMQQueue("TEST");
+        connection.start();
+    }
+    
+    protected void tearDown() throws Exception {
+        connection.close();
+        stompSocket.close();
+        broker.stop();
+    }
+
+    public void sendFrame(String data) throws Exception {
+        byte[] bytes = data.getBytes("UTF-8");
+        OutputStream outputStream = stompSocket.getOutputStream();
+        for (int i = 0; i < bytes.length; i++) {
+            outputStream.write(bytes[i]);
+        }
+        outputStream.flush();
+    }
+
+    public String receiveFrame(long timeOut) throws Exception {
+        stompSocket.setSoTimeout((int) timeOut);
+        InputStream is = stompSocket.getInputStream();
+        int c=0;
+        for(;;) {
+            c = is.read();
+            if( c < 0 ) {
+                throw new IOException("socket closed.");
+            } else if( c == 0 ) {
+                byte[] ba = inputBuffer.toByteArray();
+                inputBuffer.reset();
+                return new String(ba, "UTF-8");
+            } else {
+                inputBuffer.write(c);
+            }
+        } 
+    }
+    
+    public void testConnect() throws Exception {
+        
+        String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" +
"\n" + Stomp.NULL;
+        sendFrame(connect_frame);
+     
+        String f = receiveFrame(10000);
+        assertTrue(f.startsWith("CONNECTED"));
+        
+    }
+    
+    public void testSendMessage() throws Exception {
+        
+        MessageConsumer consumer = session.createConsumer(queue);
+        
+        String frame = 
+            "CONNECT\n" + 
+            "login: brianm\n" + 
+            "passcode: wombats\n\n"+
+            Stomp.NULL;
+        sendFrame(frame);
+     
+        frame = receiveFrame(10000);
+        assertTrue(frame.startsWith("CONNECTED"));
+        
+        frame = 
+            "SEND\n" + 
+            "destination:/queue/TEST\n\n" + 
+            "Hello World" + 
+            Stomp.NULL;
+        sendFrame(frame);
+        
+        TextMessage message = (TextMessage) consumer.receive(1000);
+        assertNotNull(message);
+        assertEquals("Hello World", message.getText());
+        
+    }
+    
+}

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/stomp/StompWireFormatTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/stomp/StompWireFormatTest.java?rev=358217&r1=358216&r2=358217&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/stomp/StompWireFormatTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/stomp/StompWireFormatTest.java
Tue Dec 20 21:45:09 2005
@@ -31,9 +31,6 @@
         ByteArrayOutputStream bout = new ByteArrayOutputStream();
         DataOutputStream dout = new DataOutputStream(bout);
 
-        wire.registerTransportStreams(dout, din);
-        wire.initiateServerSideProtocol();
-
         ConnectionInfo ci = (ConnectionInfo) wire.readCommand(din);
         assertNotNull(ci);
         assertTrue(ci.isResponseRequired());



Mime
View raw message