activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r563982 [24/32] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jm...
Date Wed, 08 Aug 2007 18:58:13 GMT
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java Wed Aug  8 11:56:59 2007
@@ -37,73 +37,74 @@
 import org.apache.activemq.transport.tcp.TcpTransportServer;
 
 public class NIOTransportFactory extends TcpTransportFactory {
-		
-	protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
-		return new TcpTransportServer(this, location, serverSocketFactory) {
-			protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
-				return new NIOTransport(format,socket);
-			}			
-		};
-	}
-	
-	protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
-		return new NIOTransport(wf, socketFactory, location, localLocation);
-	}
 
-	
+    protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+        return new TcpTransportServer(this, location, serverSocketFactory) {
+            protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
+                return new NIOTransport(format, socket);
+            }
+        };
+    }
+
+    protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
+        return new NIOTransport(wf, socketFactory, location, localLocation);
+    }
+
     protected ServerSocketFactory createServerSocketFactory() {
         return new ServerSocketFactory() {
-			public ServerSocket createServerSocket(int port) throws IOException {
-		        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-		        serverSocketChannel.socket().bind(new InetSocketAddress(port));
-				return serverSocketChannel.socket();
-			}
-			public ServerSocket createServerSocket(int port, int backlog) throws IOException {
-		        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-		        serverSocketChannel.socket().bind(new InetSocketAddress(port), backlog);
-				return serverSocketChannel.socket();
-			}
-			public ServerSocket createServerSocket(int port, int backlog, InetAddress ifAddress) throws IOException {
-		        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-		        serverSocketChannel.socket().bind(new InetSocketAddress(ifAddress, port), backlog);
-				return serverSocketChannel.socket();
-			}
+            public ServerSocket createServerSocket(int port) throws IOException {
+                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+                serverSocketChannel.socket().bind(new InetSocketAddress(port));
+                return serverSocketChannel.socket();
+            }
+
+            public ServerSocket createServerSocket(int port, int backlog) throws IOException {
+                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+                serverSocketChannel.socket().bind(new InetSocketAddress(port), backlog);
+                return serverSocketChannel.socket();
+            }
+
+            public ServerSocket createServerSocket(int port, int backlog, InetAddress ifAddress) throws IOException {
+                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+                serverSocketChannel.socket().bind(new InetSocketAddress(ifAddress, port), backlog);
+                return serverSocketChannel.socket();
+            }
         };
     }
-    
+
     protected SocketFactory createSocketFactory() {
         return new SocketFactory() {
 
-        	public Socket createSocket() throws IOException {
-		        SocketChannel channel = SocketChannel.open();
-        		return channel.socket();
-        	}
-        	
-			public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
-		        SocketChannel channel = SocketChannel.open();
-		        channel.connect(new InetSocketAddress(host, port));
-				return channel.socket();
-			}
-
-			public Socket createSocket(InetAddress address, int port) throws IOException {
-		        SocketChannel channel = SocketChannel.open();
-		        channel.connect(new InetSocketAddress(address, port));
-				return channel.socket();
-			}
-
-			public Socket createSocket(String address, int port, InetAddress localAddresss, int localPort) throws IOException, UnknownHostException {
-		        SocketChannel channel = SocketChannel.open();
-		        channel.socket().bind(new InetSocketAddress(localAddresss, localPort));
-		        channel.connect(new InetSocketAddress(address, port));
-				return channel.socket();
-			}
-
-			public Socket createSocket(InetAddress address, int port, InetAddress localAddresss, int localPort) throws IOException {
-		        SocketChannel channel = SocketChannel.open();
-		        channel.socket().bind(new InetSocketAddress(localAddresss, localPort));
-		        channel.connect(new InetSocketAddress(address, port));
-				return channel.socket();
-			}
+            public Socket createSocket() throws IOException {
+                SocketChannel channel = SocketChannel.open();
+                return channel.socket();
+            }
+
+            public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
+                SocketChannel channel = SocketChannel.open();
+                channel.connect(new InetSocketAddress(host, port));
+                return channel.socket();
+            }
+
+            public Socket createSocket(InetAddress address, int port) throws IOException {
+                SocketChannel channel = SocketChannel.open();
+                channel.connect(new InetSocketAddress(address, port));
+                return channel.socket();
+            }
+
+            public Socket createSocket(String address, int port, InetAddress localAddresss, int localPort) throws IOException, UnknownHostException {
+                SocketChannel channel = SocketChannel.open();
+                channel.socket().bind(new InetSocketAddress(localAddresss, localPort));
+                channel.connect(new InetSocketAddress(address, port));
+                return channel.socket();
+            }
+
+            public Socket createSocket(InetAddress address, int port, InetAddress localAddresss, int localPort) throws IOException {
+                SocketChannel channel = SocketChannel.open();
+                channel.socket().bind(new InetSocketAddress(localAddresss, localPort));
+                channel.connect(new InetSocketAddress(address, port));
+                return channel.socket();
+            }
         };
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java Wed Aug  8 11:56:59 2007
@@ -34,75 +34,78 @@
  */
 final public class SelectorManager {
 
-	static final public SelectorManager singleton = new SelectorManager();
-	static SelectorManager getInstance() { 
-		return singleton;
-	}
-	
-	public interface Listener {
-		public void onSelect(SelectorSelection selector);
-		public void onError(SelectorSelection selection, Throwable error);
-	}
-	
-	private Executor selectorExecutor = Executors.newCachedThreadPool(new ThreadFactory(){
-		public Thread newThread(Runnable r) {
-			Thread rc = new Thread(r);
-			rc.setName("NIO Transport Thread");
-			return rc;
-		}});
-	private Executor channelExecutor = selectorExecutor;
-	private LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
-	private int maxChannelsPerWorker = 64;
-	
-	public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener)
-	 	throws IOException {
-
-		SelectorWorker worker = null;
-		if (freeWorkers.size() > 0) {
-			worker = freeWorkers.getFirst();
-		} else {
-			worker = new SelectorWorker(this);
-			freeWorkers.addFirst(worker);
-		}
-
-		SelectorSelection selection = new SelectorSelection(worker, socketChannel, listener);				
-		return selection;
-	}
-
-	synchronized void onWorkerFullEvent(SelectorWorker worker) {
-		freeWorkers.remove(worker);
-	}
-
-	synchronized public void onWorkerEmptyEvent(SelectorWorker worker) {
-		freeWorkers.remove(worker);
-	}
-
-	synchronized public void onWorkerNotFullEvent(SelectorWorker worker) {
-		freeWorkers.add(worker);
-	}
-
-	public Executor getChannelExecutor() {
-		return channelExecutor;
-	}
-
-	public void setChannelExecutor(Executor channelExecutor) {
-		this.channelExecutor = channelExecutor;
-	}
-
-	public int getMaxChannelsPerWorker() {
-		return maxChannelsPerWorker;
-	}
-
-	public void setMaxChannelsPerWorker(int maxChannelsPerWorker) {
-		this.maxChannelsPerWorker = maxChannelsPerWorker;
-	}
-
-	public Executor getSelectorExecutor() {
-		return selectorExecutor;
-	}
-
-	public void setSelectorExecutor(Executor selectorExecutor) {
-		this.selectorExecutor = selectorExecutor;
-	}
+    static final public SelectorManager singleton = new SelectorManager();
+
+    static SelectorManager getInstance() {
+        return singleton;
+    }
+
+    public interface Listener {
+        public void onSelect(SelectorSelection selector);
+
+        public void onError(SelectorSelection selection, Throwable error);
+    }
+
+    private Executor selectorExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
+        public Thread newThread(Runnable r) {
+            Thread rc = new Thread(r);
+            rc.setName("NIO Transport Thread");
+            return rc;
+        }
+    });
+    private Executor channelExecutor = selectorExecutor;
+    private LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
+    private int maxChannelsPerWorker = 64;
+
+    public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener)
+        throws IOException {
+
+        SelectorWorker worker = null;
+        if (freeWorkers.size() > 0) {
+            worker = freeWorkers.getFirst();
+        } else {
+            worker = new SelectorWorker(this);
+            freeWorkers.addFirst(worker);
+        }
+
+        SelectorSelection selection = new SelectorSelection(worker, socketChannel, listener);
+        return selection;
+    }
+
+    synchronized void onWorkerFullEvent(SelectorWorker worker) {
+        freeWorkers.remove(worker);
+    }
+
+    synchronized public void onWorkerEmptyEvent(SelectorWorker worker) {
+        freeWorkers.remove(worker);
+    }
+
+    synchronized public void onWorkerNotFullEvent(SelectorWorker worker) {
+        freeWorkers.add(worker);
+    }
+
+    public Executor getChannelExecutor() {
+        return channelExecutor;
+    }
+
+    public void setChannelExecutor(Executor channelExecutor) {
+        this.channelExecutor = channelExecutor;
+    }
+
+    public int getMaxChannelsPerWorker() {
+        return maxChannelsPerWorker;
+    }
+
+    public void setMaxChannelsPerWorker(int maxChannelsPerWorker) {
+        this.maxChannelsPerWorker = maxChannelsPerWorker;
+    }
+
+    public Executor getSelectorExecutor() {
+        return selectorExecutor;
+    }
+
+    public void setSelectorExecutor(Executor selectorExecutor) {
+        this.selectorExecutor = selectorExecutor;
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java Wed Aug  8 11:56:59 2007
@@ -23,107 +23,106 @@
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-
 public class SelectorWorker implements Runnable {
 
-	private final static AtomicInteger NEXT_ID = new AtomicInteger();
+    private final static AtomicInteger NEXT_ID = new AtomicInteger();
 
-	final SelectorManager manager;
-	final Selector selector;
-	final int id = NEXT_ID.getAndIncrement(); 
-	final AtomicInteger useCounter = new AtomicInteger();
-	final private int maxChannelsPerWorker;
-
-
-	public SelectorWorker(SelectorManager manager) throws IOException {
-		this.manager = manager;
-		selector = Selector.open();
-		maxChannelsPerWorker = manager.getMaxChannelsPerWorker();
-	}
-	
-	void incrementUseCounter() {
-		int use = useCounter.getAndIncrement();
-		if( use == 0 ) {
-			manager.getSelectorExecutor().execute(this);
-		} else if( use+1 == maxChannelsPerWorker ) {
-			manager.onWorkerFullEvent(this);
-		}
-	}
-
-	void decrementUseCounter() {
-		int use = useCounter.getAndDecrement();
-		if (use == 1) {
-			manager.onWorkerEmptyEvent(this);
-		} else if (use == maxChannelsPerWorker ) {
-			manager.onWorkerNotFullEvent(this);
-		}
-	}
-
-	boolean isRunning() {
-		return useCounter.get()!=0;
-	}
-
-	public void run() {
-
-		String origName = Thread.currentThread().getName();
-		try {
-			Thread.currentThread().setName("Selector Worker: " + id);
-			while (isRunning()) {
-
-				int count = selector.select(10);
-				if (count == 0)
-					continue;
-				
-				if (!isRunning())
-					return;
-
-				// Get a java.util.Set containing the SelectionKey objects
-				// for all channels that are ready for I/O.
-				Set keys = selector.selectedKeys();
-
-				for (Iterator i = keys.iterator(); i.hasNext();) {
-					final SelectionKey key = (SelectionKey) i.next();
-					i.remove();
-
-					final SelectorSelection s = (SelectorSelection) key.attachment();
-					try {
-						s.disable();
-						
-						// Kick off another thread to find newly selected keys while we process the 
-						// currently selected keys                
-						manager.getChannelExecutor().execute(new Runnable() {
-							public void run() {
-								try {
-									s.onSelect();
-									s.enable();
-								} catch (Throwable e) {
-									s.onError(e);
-								}
-							}
-						});
-						
-					} catch ( Throwable e ) {
-						s.onError(e);
-					}
-					
-				}
-
-			}
-		} catch (IOException e) {
-			
-			// Don't accept any more slections
-			manager.onWorkerEmptyEvent(this);
-
-			// Notify all the selections that the error occurred.
-			Set keys = selector.keys();
-			for (Iterator i = keys.iterator(); i.hasNext();) {
-				SelectionKey key = (SelectionKey) i.next();
-				SelectorSelection s = (SelectorSelection) key.attachment();
-				s.onError(e);
-			}
-			
-		} finally {
-			Thread.currentThread().setName(origName);
-		}
-	}
-}
\ No newline at end of file
+    final SelectorManager manager;
+    final Selector selector;
+    final int id = NEXT_ID.getAndIncrement();
+    final AtomicInteger useCounter = new AtomicInteger();
+    final private int maxChannelsPerWorker;
+
+    public SelectorWorker(SelectorManager manager) throws IOException {
+        this.manager = manager;
+        selector = Selector.open();
+        maxChannelsPerWorker = manager.getMaxChannelsPerWorker();
+    }
+
+    void incrementUseCounter() {
+        int use = useCounter.getAndIncrement();
+        if (use == 0) {
+            manager.getSelectorExecutor().execute(this);
+        } else if (use + 1 == maxChannelsPerWorker) {
+            manager.onWorkerFullEvent(this);
+        }
+    }
+
+    void decrementUseCounter() {
+        int use = useCounter.getAndDecrement();
+        if (use == 1) {
+            manager.onWorkerEmptyEvent(this);
+        } else if (use == maxChannelsPerWorker) {
+            manager.onWorkerNotFullEvent(this);
+        }
+    }
+
+    boolean isRunning() {
+        return useCounter.get() != 0;
+    }
+
+    public void run() {
+
+        String origName = Thread.currentThread().getName();
+        try {
+            Thread.currentThread().setName("Selector Worker: " + id);
+            while (isRunning()) {
+
+                int count = selector.select(10);
+                if (count == 0)
+                    continue;
+
+                if (!isRunning())
+                    return;
+
+                // Get a java.util.Set containing the SelectionKey objects
+                // for all channels that are ready for I/O.
+                Set keys = selector.selectedKeys();
+
+                for (Iterator i = keys.iterator(); i.hasNext();) {
+                    final SelectionKey key = (SelectionKey)i.next();
+                    i.remove();
+
+                    final SelectorSelection s = (SelectorSelection)key.attachment();
+                    try {
+                        s.disable();
+
+                        // Kick off another thread to find newly selected keys
+                        // while we process the
+                        // currently selected keys
+                        manager.getChannelExecutor().execute(new Runnable() {
+                            public void run() {
+                                try {
+                                    s.onSelect();
+                                    s.enable();
+                                } catch (Throwable e) {
+                                    s.onError(e);
+                                }
+                            }
+                        });
+
+                    } catch (Throwable e) {
+                        s.onError(e);
+                    }
+
+                }
+
+            }
+        } catch (IOException e) {
+
+            // Don't accept any more slections
+            manager.onWorkerEmptyEvent(this);
+
+            // Notify all the selections that the error occurred.
+            Set keys = selector.keys();
+            for (Iterator i = keys.iterator(); i.hasNext();) {
+                SelectionKey key = (SelectionKey)i.next();
+                SelectorSelection s = (SelectorSelection)key.attachment();
+                s.onError(e);
+            }
+
+        } finally {
+            Thread.currentThread().setName(origName);
+        }
+    }
+}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/peer/PeerTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/peer/PeerTransportFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/peer/PeerTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/peer/PeerTransportFactory.java Wed Aug  8 11:56:59 2007
@@ -43,10 +43,9 @@
     final public static ConcurrentHashMap connectors = new ConcurrentHashMap();
 
     final public static ConcurrentHashMap servers = new ConcurrentHashMap();
-  
+
     private IdGenerator idGenerator = new IdGenerator("peer-");
 
-    
     public Transport doConnect(URI location) throws Exception {
         VMTransportFactory vmTransportFactory = createTransportFactory(location);
         return vmTransportFactory.doConnect(location);
@@ -66,50 +65,50 @@
         try {
             String group = location.getHost();
             String broker = URISupport.stripPrefix(location.getPath(), "/");
-            
-            if( group == null ) {
+
+            if (group == null) {
                 group = "default";
             }
-            if (broker == null || broker.length()==0){
+            if (broker == null || broker.length() == 0) {
                 broker = idGenerator.generateSanitizedId();
             }
-            
+
             final Map brokerOptions = new HashMap(URISupport.parseParamters(location));
-            if (!brokerOptions.containsKey("persistent")){
+            if (!brokerOptions.containsKey("persistent")) {
                 brokerOptions.put("persistent", "false");
             }
-                        
-            final URI finalLocation = new URI("vm://"+broker);
+
+            final URI finalLocation = new URI("vm://" + broker);
             final String finalBroker = broker;
             final String finalGroup = group;
             VMTransportFactory rc = new VMTransportFactory() {
                 public Transport doConnect(URI ignore) throws Exception {
                     return super.doConnect(finalLocation);
                 };
+
                 public Transport doCompositeConnect(URI ignore) throws Exception {
                     return super.doCompositeConnect(finalLocation);
                 };
             };
-            rc.setBrokerFactoryHandler(new BrokerFactoryHandler(){
+            rc.setBrokerFactoryHandler(new BrokerFactoryHandler() {
                 public BrokerService createBroker(URI brokerURI) throws Exception {
                     BrokerService service = new BrokerService();
                     IntrospectionSupport.setProperties(service, brokerOptions);
                     service.setBrokerName(finalBroker);
                     TransportConnector c = service.addConnector("tcp://localhost:0");
-                    c.setDiscoveryUri(new URI("multicast://"+finalGroup));
-                    service.addNetworkConnector("multicast://"+finalGroup);
+                    c.setDiscoveryUri(new URI("multicast://" + finalGroup));
+                    service.addNetworkConnector("multicast://" + finalGroup);
                     return service;
                 }
             });
             return rc;
-            
+
         } catch (URISyntaxException e) {
             throw IOExceptionSupport.create(e);
         }
     }
 
-
-    public TransportServer doBind(String brokerId,URI location) throws IOException {
+    public TransportServer doBind(String brokerId, URI location) throws IOException {
         throw new IOException("This protocol does not support being bound.");
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java Wed Aug  8 11:56:59 2007
@@ -54,8 +54,7 @@
         this.replayStrategy = replayStrategy;
     }
 
-    public ReliableTransport(Transport next, UdpTransport udpTransport)
-            throws IOException {
+    public ReliableTransport(Transport next, UdpTransport udpTransport) throws IOException {
         super(next, udpTransport.getSequenceGenerator());
         this.udpTransport = udpTransport;
         this.replayer = udpTransport.createReplayer();
@@ -70,14 +69,13 @@
         replay.setLastNakNumber(toCommandId);
         try {
             oneway(replay);
-        }
-        catch (IOException e) {
+        } catch (IOException e) {
             getTransportListener().onException(e);
         }
     }
 
     public Object request(Object o) throws IOException {
-    	final Command command = (Command) o;
+        final Command command = (Command)o;
         FutureResponse response = asyncRequest(command, null);
         while (true) {
             Response result = response.getResult(requestTimeout);
@@ -89,7 +87,7 @@
     }
 
     public Object request(Object o, int timeout) throws IOException {
-    	final Command command = (Command) o;
+        final Command command = (Command)o;
         FutureResponse response = asyncRequest(command, null);
         while (timeout > 0) {
             int time = timeout;
@@ -107,14 +105,13 @@
     }
 
     public void onCommand(Object o) {
-    	Command command = (Command) o;
+        Command command = (Command)o;
         // lets pass wireformat through
         if (command.isWireFormatInfo()) {
             super.onCommand(command);
             return;
-        }
-        else if (command.getDataStructureType() == ReplayCommand.DATA_STRUCTURE_TYPE) {
-            replayCommands((ReplayCommand) command);
+        } else if (command.getDataStructureType() == ReplayCommand.DATA_STRUCTURE_TYPE) {
+            replayCommands((ReplayCommand)command);
             return;
         }
 
@@ -126,10 +123,10 @@
                 int nextCounter = actualCounter;
                 boolean empty = commands.isEmpty();
                 if (!empty) {
-                    Command nextAvailable = (Command) commands.first();
+                    Command nextAvailable = (Command)commands.first();
                     nextCounter = nextAvailable.getCommandId();
                 }
-                
+
                 try {
                     boolean keep = replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter, nextCounter);
 
@@ -140,15 +137,14 @@
                         }
                         commands.add(command);
                     }
-                }
-                catch (IOException e) {
+                } catch (IOException e) {
                     onException(e);
                 }
 
                 if (!empty) {
                     // lets see if the first item in the set is the next
                     // expected
-                    command = (Command) commands.first();
+                    command = (Command)commands.first();
                     valid = expectedCounter == command.getCommandId();
                     if (valid) {
                         commands.remove(command);
@@ -169,7 +165,7 @@
                 if (valid) {
                     // lets see if the first item in the set is the next
                     // expected
-                    command = (Command) commands.first();
+                    command = (Command)commands.first();
                     valid = expectedCounter == command.getCommandId();
                     if (valid) {
                         commands.remove(command);
@@ -288,8 +284,7 @@
             // TODO we could proactively remove ack'd stuff from the replay
             // buffer
             // if we only have a single client talking to us
-        }
-        catch (IOException e) {
+        } catch (IOException e) {
             onException(e);
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java Wed Aug  8 11:56:59 2007
@@ -10,12 +10,12 @@
 import java.util.HashMap;
 
 /**
- * Implementations of this interface are used to map back and forth from Stomp to ActiveMQ.
- * There are several standard mappings which are semantically the same, the inner class,
- * Helper, provides functions to copy those properties from one to the other
+ * Implementations of this interface are used to map back and forth from Stomp
+ * to ActiveMQ. There are several standard mappings which are semantically the
+ * same, the inner class, Helper, provides functions to copy those properties
+ * from one to the other
  */
-public interface FrameTranslator
-{
+public interface FrameTranslator {
     public ActiveMQMessage convertFrame(StompFrame frame) throws JMSException, ProtocolException;
 
     public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException;
@@ -28,13 +28,8 @@
      * Helper class which holds commonly needed functions used when implementing
      * FrameTranslators
      */
-    public final static class Helper
-    {
-        public static void copyStandardHeadersFromMessageToFrame(ActiveMQMessage message,
-                                                                 StompFrame command,
-                                                                 FrameTranslator ft)
-                throws IOException
-        {
+    public final static class Helper {
+        public static void copyStandardHeadersFromMessageToFrame(ActiveMQMessage message, StompFrame command, FrameTranslator ft) throws IOException {
             final Map headers = command.getHeaders();
             headers.put(Stomp.Headers.Message.DESTINATION, ft.convertDestination(message.getDestination()));
             headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID());
@@ -42,17 +37,17 @@
             if (message.getJMSCorrelationID() != null) {
                 headers.put(Stomp.Headers.Message.CORRELATION_ID, message.getJMSCorrelationID());
             }
-            headers.put(Stomp.Headers.Message.EXPIRATION_TIME, ""+message.getJMSExpiration());
+            headers.put(Stomp.Headers.Message.EXPIRATION_TIME, "" + message.getJMSExpiration());
 
             if (message.getJMSRedelivered()) {
                 headers.put(Stomp.Headers.Message.REDELIVERED, "true");
             }
-            headers.put(Stomp.Headers.Message.PRORITY, ""+message.getJMSPriority());
+            headers.put(Stomp.Headers.Message.PRORITY, "" + message.getJMSPriority());
 
             if (message.getJMSReplyTo() != null) {
                 headers.put(Stomp.Headers.Message.REPLY_TO, ft.convertDestination(message.getJMSReplyTo()));
             }
-            headers.put(Stomp.Headers.Message.TIMESTAMP, ""+message.getJMSTimestamp());
+            headers.put(Stomp.Headers.Message.TIMESTAMP, "" + message.getJMSTimestamp());
 
             if (message.getJMSType() != null) {
                 headers.put(Stomp.Headers.Message.TYPE, message.getJMSType());
@@ -65,36 +60,32 @@
             }
         }
 
-        public static void copyStandardHeadersFromFrameToMessage(StompFrame command,
-                                                                 ActiveMQMessage msg,
-                                                                 FrameTranslator ft)
-                throws ProtocolException, JMSException
-        {
+        public static void copyStandardHeadersFromFrameToMessage(StompFrame command, ActiveMQMessage msg, FrameTranslator ft) throws ProtocolException, JMSException {
             final Map headers = new HashMap(command.getHeaders());
-            final String destination = (String) headers.remove(Stomp.Headers.Send.DESTINATION);
-            msg.setDestination( ft.convertDestination(destination));
+            final String destination = (String)headers.remove(Stomp.Headers.Send.DESTINATION);
+            msg.setDestination(ft.convertDestination(destination));
 
             // the standard JMS headers
-            msg.setJMSCorrelationID((String) headers.remove(Stomp.Headers.Send.CORRELATION_ID));
+            msg.setJMSCorrelationID((String)headers.remove(Stomp.Headers.Send.CORRELATION_ID));
 
             Object o = headers.remove(Stomp.Headers.Send.EXPIRATION_TIME);
             if (o != null) {
-                msg.setJMSExpiration(Long.parseLong((String) o));
+                msg.setJMSExpiration(Long.parseLong((String)o));
             }
 
             o = headers.remove(Stomp.Headers.Send.PRIORITY);
             if (o != null) {
-                msg.setJMSPriority(Integer.parseInt((String) o));
+                msg.setJMSPriority(Integer.parseInt((String)o));
             }
 
             o = headers.remove(Stomp.Headers.Send.TYPE);
             if (o != null) {
-                msg.setJMSType((String) o);
+                msg.setJMSType((String)o);
             }
 
             o = headers.remove(Stomp.Headers.Send.REPLY_TO);
             if (o != null) {
-                msg.setJMSReplyTo(ft.convertDestination((String) o));
+                msg.setJMSReplyTo(ft.convertDestination((String)o));
             }
 
             o = headers.remove(Stomp.Headers.Send.PERSISTENT);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java Wed Aug  8 11:56:59 2007
@@ -14,8 +14,7 @@
 /**
  * Implements ActiveMQ 4.0 translations
  */
-public class LegacyFrameTranslator implements FrameTranslator
-{
+public class LegacyFrameTranslator implements FrameTranslator {
     public ActiveMQMessage convertFrame(StompFrame command) throws JMSException, ProtocolException {
         final Map headers = command.getHeaders();
         final ActiveMQMessage msg;
@@ -28,8 +27,7 @@
             ActiveMQTextMessage text = new ActiveMQTextMessage();
             try {
                 text.setText(new String(command.getContent(), "UTF-8"));
-            }
-            catch (Throwable e) {
+            } catch (Throwable e) {
                 throw new ProtocolException("Text could not bet set: " + e, false, e);
             }
             msg = text;
@@ -40,25 +38,25 @@
 
     public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException {
         StompFrame command = new StompFrame();
-		command.setAction(Stomp.Responses.MESSAGE);
+        command.setAction(Stomp.Responses.MESSAGE);
         Map headers = new HashMap(25);
         command.setHeaders(headers);
 
         FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(message, command, this);
 
-        if( message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE ) {
+        if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
 
             ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
             command.setContent(msg.getText().getBytes("UTF-8"));
 
-        } else if( message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE ) {
+        } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
 
-        	ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();
-        	msg.setReadOnlyBody(true);
+            ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();
+            msg.setReadOnlyBody(true);
             byte[] data = new byte[(int)msg.getBodyLength()];
             msg.readBytes(data);
 
-            headers.put(Stomp.Headers.CONTENT_LENGTH, ""+data.length);
+            headers.put(Stomp.Headers.CONTENT_LENGTH, "" + data.length);
             command.setContent(data);
         }
         return command;
@@ -68,23 +66,20 @@
         if (d == null) {
             return null;
         }
-        ActiveMQDestination amq_d = (ActiveMQDestination) d;
+        ActiveMQDestination amq_d = (ActiveMQDestination)d;
         String p_name = amq_d.getPhysicalName();
 
         StringBuffer buffer = new StringBuffer();
         if (amq_d.isQueue()) {
             if (amq_d.isTemporary()) {
                 buffer.append("/temp-queue/");
-            }
-            else {
+            } else {
                 buffer.append("/queue/");
             }
-        }
-        else {
+        } else {
             if (amq_d.isTemporary()) {
                 buffer.append("/temp-topic/");
-            }
-            else {
+            } else {
                 buffer.append("/topic/");
             }
         }
@@ -95,26 +90,21 @@
     public ActiveMQDestination convertDestination(String name) throws ProtocolException {
         if (name == null) {
             return null;
-        }
-        else if (name.startsWith("/queue/")) {
+        } else if (name.startsWith("/queue/")) {
             String q_name = name.substring("/queue/".length(), name.length());
             return ActiveMQDestination.createDestination(q_name, ActiveMQDestination.QUEUE_TYPE);
-        }
-        else if (name.startsWith("/topic/")) {
+        } else if (name.startsWith("/topic/")) {
             String t_name = name.substring("/topic/".length(), name.length());
             return ActiveMQDestination.createDestination(t_name, ActiveMQDestination.TOPIC_TYPE);
-        }
-        else if (name.startsWith("/temp-queue/")) {
+        } else if (name.startsWith("/temp-queue/")) {
             String t_name = name.substring("/temp-queue/".length(), name.length());
             return ActiveMQDestination.createDestination(t_name, ActiveMQDestination.TEMP_QUEUE_TYPE);
-        }
-        else if (name.startsWith("/temp-topic/")) {
+        } else if (name.startsWith("/temp-topic/")) {
             String t_name = name.substring("/temp-topic/".length(), name.length());
             return ActiveMQDestination.createDestination(t_name, ActiveMQDestination.TEMP_TOPIC_TYPE);
-        }
-        else {
-            throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations " +
-                                        "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+        } else {
+            throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations "
+                                        + "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
         }
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java Wed Aug  8 11:56:59 2007
@@ -56,7 +56,6 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- *
  * @author <a href="http://hiramchirino.com">chirino</a>
  */
 public class ProtocolConverter {
@@ -73,117 +72,117 @@
     private final ConcurrentHashMap resposeHandlers = new ConcurrentHashMap();
     private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap();
     private final Map transactions = new ConcurrentHashMap();
-	private final StompTransportFilter transportFilter;
+    private final StompTransportFilter transportFilter;
 
-	private final Object commnadIdMutex = new Object();
-	private int lastCommandId;
+    private final Object commnadIdMutex = new Object();
+    private int lastCommandId;
     private final AtomicBoolean connected = new AtomicBoolean(false);
     private final FrameTranslator frameTranslator;
 
-    public ProtocolConverter(StompTransportFilter stompTransportFilter, FrameTranslator translator)
-    {
+    public ProtocolConverter(StompTransportFilter stompTransportFilter, FrameTranslator translator) {
         this.transportFilter = stompTransportFilter;
         this.frameTranslator = translator;
     }
 
     protected int generateCommandId() {
-    	synchronized(commnadIdMutex){
-    		return lastCommandId++;
-    	}
+        synchronized (commnadIdMutex) {
+            return lastCommandId++;
+        }
     }
 
-    protected ResponseHandler createResponseHandler(StompFrame command){
-        final String receiptId = (String) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+    protected ResponseHandler createResponseHandler(StompFrame command) {
+        final String receiptId = (String)command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
         // A response may not be needed.
-        if( receiptId != null ) {
-	        return new ResponseHandler() {
-	    		public void onResponse(ProtocolConverter converter, Response response) throws IOException {
-	                StompFrame sc = new StompFrame();
-	                sc.setAction(Stomp.Responses.RECEIPT);
-	                sc.setHeaders(new HashMap(1));
-	                sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
-	        		transportFilter.sendToStomp(sc);
-	    		}
-	        };
-	    }
-    	return null;
-    }
-
-	protected void sendToActiveMQ(Command command, ResponseHandler handler) {
-		command.setCommandId(generateCommandId());
-		if(handler!=null) {
-			command.setResponseRequired(true);
-			resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
-		}
-		transportFilter.sendToActiveMQ(command);
-	}
-
-	protected void sendToStomp(StompFrame command) throws IOException {
-		transportFilter.sendToStomp(command);
-	}
+        if (receiptId != null) {
+            return new ResponseHandler() {
+                public void onResponse(ProtocolConverter converter, Response response) throws IOException {
+                    StompFrame sc = new StompFrame();
+                    sc.setAction(Stomp.Responses.RECEIPT);
+                    sc.setHeaders(new HashMap(1));
+                    sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+                    transportFilter.sendToStomp(sc);
+                }
+            };
+        }
+        return null;
+    }
+
+    protected void sendToActiveMQ(Command command, ResponseHandler handler) {
+        command.setCommandId(generateCommandId());
+        if (handler != null) {
+            command.setResponseRequired(true);
+            resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
+        }
+        transportFilter.sendToActiveMQ(command);
+    }
 
-	/**
+    protected void sendToStomp(StompFrame command) throws IOException {
+        transportFilter.sendToStomp(command);
+    }
+
+    /**
      * Convert a stomp command
+     * 
      * @param command
      */
-	public void onStompCommad( StompFrame command ) throws IOException, JMSException {
-		try {
+    public void onStompCommad(StompFrame command) throws IOException, JMSException {
+        try {
 
-			if( command.getClass() == StompFrameError.class ) {
-				throw ((StompFrameError)command).getException();
-			}
-
-			String action = command.getAction();
-	        if (action.startsWith(Stomp.Commands.SEND))
-	            onStompSend(command);
-	        else if (action.startsWith(Stomp.Commands.ACK))
-	            onStompAck(command);
-	        else if (action.startsWith(Stomp.Commands.BEGIN))
-	            onStompBegin(command);
-	        else if (action.startsWith(Stomp.Commands.COMMIT))
-	            onStompCommit(command);
-	        else if (action.startsWith(Stomp.Commands.ABORT))
-	            onStompAbort(command);
-	        else if (action.startsWith(Stomp.Commands.SUBSCRIBE))
-	            onStompSubscribe(command);
-	        else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE))
-	            onStompUnsubscribe(command);
-			else if (action.startsWith(Stomp.Commands.CONNECT))
-	            onStompConnect(command);
-	        else if (action.startsWith(Stomp.Commands.DISCONNECT))
-	            onStompDisconnect(command);
-	        else
-	        	throw new ProtocolException("Unknown STOMP action: "+action);
+            if (command.getClass() == StompFrameError.class) {
+                throw ((StompFrameError)command).getException();
+            }
+
+            String action = command.getAction();
+            if (action.startsWith(Stomp.Commands.SEND))
+                onStompSend(command);
+            else if (action.startsWith(Stomp.Commands.ACK))
+                onStompAck(command);
+            else if (action.startsWith(Stomp.Commands.BEGIN))
+                onStompBegin(command);
+            else if (action.startsWith(Stomp.Commands.COMMIT))
+                onStompCommit(command);
+            else if (action.startsWith(Stomp.Commands.ABORT))
+                onStompAbort(command);
+            else if (action.startsWith(Stomp.Commands.SUBSCRIBE))
+                onStompSubscribe(command);
+            else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE))
+                onStompUnsubscribe(command);
+            else if (action.startsWith(Stomp.Commands.CONNECT))
+                onStompConnect(command);
+            else if (action.startsWith(Stomp.Commands.DISCONNECT))
+                onStompDisconnect(command);
+            else
+                throw new ProtocolException("Unknown STOMP action: " + action);
 
         } catch (ProtocolException e) {
 
-        	// Let the stomp client know about any protocol errors.
-        	ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        	PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos,"UTF-8"));
-        	e.printStackTrace(stream);
-        	stream.close();
-
-        	HashMap headers = new HashMap();
-        	headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
-
-            final String receiptId = (String) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
-            if( receiptId != null ) {
-            	headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+            // Let the stomp client know about any protocol errors.
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
+            e.printStackTrace(stream);
+            stream.close();
+
+            HashMap headers = new HashMap();
+            headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
+
+            final String receiptId = (String)command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+            if (receiptId != null) {
+                headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
             }
 
-        	StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR,headers,baos.toByteArray());
-			sendToStomp(errorMessage);
+            StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
+            sendToStomp(errorMessage);
 
-			if( e.isFatal() )
-				getTransportFilter().onException(e);
+            if (e.isFatal())
+                getTransportFilter().onException(e);
         }
-	}
+    }
 
-	protected void onStompSend(StompFrame command) throws IOException, JMSException {
-		checkConnected();
+    protected void onStompSend(StompFrame command) throws IOException, JMSException {
+        checkConnected();
 
-    	Map headers = command.getHeaders();
-        String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
+        Map headers = command.getHeaders();
+        String stompTx = (String)headers.get(Stomp.Headers.TRANSACTION);
 
         ActiveMQMessage message = convertMessage(command);
 
@@ -192,61 +191,61 @@
         message.setMessageId(id);
         message.setJMSTimestamp(System.currentTimeMillis());
 
-        if (stompTx!=null) {
-        	TransactionId activemqTx = (TransactionId) transactions.get(stompTx);
+        if (stompTx != null) {
+            TransactionId activemqTx = (TransactionId)transactions.get(stompTx);
             if (activemqTx == null)
-                throw new ProtocolException("Invalid transaction id: "+stompTx);
+                throw new ProtocolException("Invalid transaction id: " + stompTx);
             message.setTransactionId(activemqTx);
         }
 
         message.onSend();
-		sendToActiveMQ(message, createResponseHandler(command));
-
-	}
+        sendToActiveMQ(message, createResponseHandler(command));
 
+    }
 
     protected void onStompAck(StompFrame command) throws ProtocolException {
-		checkConnected();
+        checkConnected();
 
-    	// TODO: acking with just a message id is very bogus
-    	// since the same message id could have been sent to 2 different subscriptions
-    	// on the same stomp connection. For example, when 2 subs are created on the same topic.
+        // TODO: acking with just a message id is very bogus
+        // since the same message id could have been sent to 2 different
+        // subscriptions
+        // on the same stomp connection. For example, when 2 subs are created on
+        // the same topic.
 
-    	Map headers = command.getHeaders();
-        String messageId = (String) headers.get(Stomp.Headers.Ack.MESSAGE_ID);
+        Map headers = command.getHeaders();
+        String messageId = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
         if (messageId == null)
             throw new ProtocolException("ACK received without a message-id to acknowledge!");
 
-        TransactionId activemqTx=null;
-        String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
-        if (stompTx!=null) {
-        	activemqTx = (TransactionId) transactions.get(stompTx);
+        TransactionId activemqTx = null;
+        String stompTx = (String)headers.get(Stomp.Headers.TRANSACTION);
+        if (stompTx != null) {
+            activemqTx = (TransactionId)transactions.get(stompTx);
             if (activemqTx == null)
-                throw new ProtocolException("Invalid transaction id: "+stompTx);
+                throw new ProtocolException("Invalid transaction id: " + stompTx);
         }
 
-        boolean acked=false;
+        boolean acked = false;
         for (Iterator iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
-			StompSubscription sub = (StompSubscription) iter.next();
-			MessageAck ack = sub.onStompMessageAck(messageId);
-			if( ack!=null ) {
-		        ack.setTransactionId(activemqTx);
-		        sendToActiveMQ(ack,createResponseHandler(command));
-		        acked=true;
-		        break;
-			}
-		}
-
-        if( !acked )
-        	throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
+            StompSubscription sub = (StompSubscription)iter.next();
+            MessageAck ack = sub.onStompMessageAck(messageId);
+            if (ack != null) {
+                ack.setTransactionId(activemqTx);
+                sendToActiveMQ(ack, createResponseHandler(command));
+                acked = true;
+                break;
+            }
+        }
 
-	}
+        if (!acked)
+            throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
 
+    }
 
-	protected void onStompBegin(StompFrame command) throws ProtocolException {
-		checkConnected();
+    protected void onStompBegin(StompFrame command) throws ProtocolException {
+        checkConnected();
 
-		Map headers = command.getHeaders();
+        Map headers = command.getHeaders();
 
         String stompTx = (String)headers.get(Stomp.Headers.TRANSACTION);
 
@@ -254,8 +253,8 @@
             throw new ProtocolException("Must specify the transaction you are beginning");
         }
 
-        if( transactions.get(stompTx)!=null  ) {
-            throw new ProtocolException("The transaction was allready started: "+stompTx);
+        if (transactions.get(stompTx) != null) {
+            throw new ProtocolException("The transaction was allready started: " + stompTx);
         }
 
         LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId());
@@ -266,23 +265,23 @@
         tx.setTransactionId(activemqTx);
         tx.setType(TransactionInfo.BEGIN);
 
-		sendToActiveMQ(tx, createResponseHandler(command));
+        sendToActiveMQ(tx, createResponseHandler(command));
 
-	}
+    }
 
-	protected void onStompCommit(StompFrame command) throws ProtocolException {
-		checkConnected();
+    protected void onStompCommit(StompFrame command) throws ProtocolException {
+        checkConnected();
 
-		Map headers = command.getHeaders();
+        Map headers = command.getHeaders();
 
-        String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
-        if (stompTx==null) {
+        String stompTx = (String)headers.get(Stomp.Headers.TRANSACTION);
+        if (stompTx == null) {
             throw new ProtocolException("Must specify the transaction you are committing");
         }
 
-        TransactionId activemqTx = (TransactionId) transactions.remove(stompTx);
+        TransactionId activemqTx = (TransactionId)transactions.remove(stompTx);
         if (activemqTx == null) {
-            throw new ProtocolException("Invalid transaction id: "+stompTx);
+            throw new ProtocolException("Invalid transaction id: " + stompTx);
         }
 
         TransactionInfo tx = new TransactionInfo();
@@ -290,21 +289,21 @@
         tx.setTransactionId(activemqTx);
         tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
 
-		sendToActiveMQ(tx, createResponseHandler(command));
-	}
+        sendToActiveMQ(tx, createResponseHandler(command));
+    }
 
-	protected void onStompAbort(StompFrame command) throws ProtocolException {
-		checkConnected();
-    	Map headers = command.getHeaders();
+    protected void onStompAbort(StompFrame command) throws ProtocolException {
+        checkConnected();
+        Map headers = command.getHeaders();
 
-        String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
-        if (stompTx==null) {
+        String stompTx = (String)headers.get(Stomp.Headers.TRANSACTION);
+        if (stompTx == null) {
             throw new ProtocolException("Must specify the transaction you are committing");
         }
 
-        TransactionId activemqTx = (TransactionId) transactions.remove(stompTx);
+        TransactionId activemqTx = (TransactionId)transactions.remove(stompTx);
         if (activemqTx == null) {
-            throw new ProtocolException("Invalid transaction id: "+stompTx);
+            throw new ProtocolException("Invalid transaction id: " + stompTx);
         }
 
         TransactionInfo tx = new TransactionInfo();
@@ -312,13 +311,13 @@
         tx.setTransactionId(activemqTx);
         tx.setType(TransactionInfo.ROLLBACK);
 
-		sendToActiveMQ(tx, createResponseHandler(command));
+        sendToActiveMQ(tx, createResponseHandler(command));
 
-	}
+    }
 
-	protected void onStompSubscribe(StompFrame command) throws ProtocolException {
-		checkConnected();
-    	Map headers = command.getHeaders();
+    protected void onStompSubscribe(StompFrame command) throws ProtocolException {
+        checkConnected();
+        Map headers = command.getHeaders();
 
         String subscriptionId = (String)headers.get(Stomp.Headers.Subscribe.ID);
         String destination = (String)headers.get(Stomp.Headers.Subscribe.DESTINATION);
@@ -329,7 +328,7 @@
         consumerInfo.setPrefetchSize(1000);
         consumerInfo.setDispatchAsync(true);
 
-        String selector = (String) headers.remove(Stomp.Headers.Subscribe.SELECTOR);
+        String selector = (String)headers.remove(Stomp.Headers.Subscribe.SELECTOR);
         consumerInfo.setSelector(selector);
 
         IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
@@ -347,50 +346,49 @@
         }
 
         subscriptionsByConsumerId.put(id, stompSubscription);
-		sendToActiveMQ(consumerInfo, createResponseHandler(command));
+        sendToActiveMQ(consumerInfo, createResponseHandler(command));
 
-	}
+    }
 
-	protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
-		checkConnected();
-    	Map headers = command.getHeaders();
+    protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
+        checkConnected();
+        Map headers = command.getHeaders();
 
-        ActiveMQDestination destination=null;
+        ActiveMQDestination destination = null;
         Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
-        if( o!=null )
-        	destination = frameTranslator.convertDestination((String) o);
+        if (o != null)
+            destination = frameTranslator.convertDestination((String)o);
 
         String subscriptionId = (String)headers.get(Stomp.Headers.Unsubscribe.ID);
 
-        if (subscriptionId==null && destination==null) {
+        if (subscriptionId == null && destination == null) {
             throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
         }
 
-        // TODO: Unsubscribing using a destination is a bit wierd if multiple subscriptions
-        // are created with the same destination.  Perhaps this should be removed.
+        // TODO: Unsubscribing using a destination is a bit wierd if multiple
+        // subscriptions
+        // are created with the same destination. Perhaps this should be
+        // removed.
         //
         for (Iterator iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
-			StompSubscription sub = (StompSubscription) iter.next();
-			if (
-				(subscriptionId!=null && subscriptionId.equals(sub.getSubscriptionId()) ) ||
-				(destination!=null && destination.equals(sub.getDestination()) )
-			) {
-		        sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
-				iter.remove();
+            StompSubscription sub = (StompSubscription)iter.next();
+            if ((subscriptionId != null && subscriptionId.equals(sub.getSubscriptionId())) || (destination != null && destination.equals(sub.getDestination()))) {
+                sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
+                iter.remove();
                 return;
-			}
-		}
+            }
+        }
 
         throw new ProtocolException("No subscription matched.");
-	}
+    }
 
-	protected void onStompConnect(StompFrame command) throws ProtocolException {
+    protected void onStompConnect(StompFrame command) throws ProtocolException {
 
-		if(connected.get()) {
-			throw new ProtocolException("Allready connected.");
-		}
+        if (connected.get()) {
+            throw new ProtocolException("Allready connected.");
+        }
 
-    	final Map headers = command.getHeaders();
+        final Map headers = command.getHeaders();
 
         // allow anyone to login for now
         String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
@@ -402,100 +400,99 @@
         IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
 
         connectionInfo.setConnectionId(connectionId);
-        if( clientId!=null )
+        if (clientId != null)
             connectionInfo.setClientId(clientId);
         else
-            connectionInfo.setClientId(""+connectionInfo.getConnectionId().toString());
+            connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
 
         connectionInfo.setResponseRequired(true);
         connectionInfo.setUserName(login);
         connectionInfo.setPassword(passcode);
 
-		sendToActiveMQ(connectionInfo, new ResponseHandler(){
-			public void onResponse(ProtocolConverter converter, Response response) throws IOException {
+        sendToActiveMQ(connectionInfo, new ResponseHandler() {
+            public void onResponse(ProtocolConverter converter, Response response) throws IOException {
 
-	            final SessionInfo sessionInfo = new SessionInfo(sessionId);
-	            sendToActiveMQ(sessionInfo,null);
+                final SessionInfo sessionInfo = new SessionInfo(sessionId);
+                sendToActiveMQ(sessionInfo, null);
 
+                final ProducerInfo producerInfo = new ProducerInfo(producerId);
+                sendToActiveMQ(producerInfo, new ResponseHandler() {
+                    public void onResponse(ProtocolConverter converter, Response response) throws IOException {
 
-	            final ProducerInfo producerInfo = new ProducerInfo(producerId);
-	            sendToActiveMQ(producerInfo,new ResponseHandler(){
-					public void onResponse(ProtocolConverter converter, Response response) throws IOException {
+                        connected.set(true);
+                        HashMap responseHeaders = new HashMap();
 
-						connected.set(true);
-	                    HashMap responseHeaders = new HashMap();
-
-	                    responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
-	                    String requestId = (String) headers.get(Stomp.Headers.Connect.REQUEST_ID);
+                        responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
+                        String requestId = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
                         if (requestId == null) {
                             // TODO legacy
-                            requestId = (String) headers.get(Stomp.Headers.RECEIPT_REQUESTED);
+                            requestId = (String)headers.get(Stomp.Headers.RECEIPT_REQUESTED);
                         }
-	                    if( requestId !=null ){
+                        if (requestId != null) {
                             // TODO legacy
-		                    responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
+                            responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
                             responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId);
-	            		}
+                        }
+
+                        StompFrame sc = new StompFrame();
+                        sc.setAction(Stomp.Responses.CONNECTED);
+                        sc.setHeaders(responseHeaders);
+                        sendToStomp(sc);
+                    }
+                });
 
-	                    StompFrame sc = new StompFrame();
-	                    sc.setAction(Stomp.Responses.CONNECTED);
-	                    sc.setHeaders(responseHeaders);
-	                    sendToStomp(sc);
-					}
-				});
-
-			}
-		});
-	}
-
-	protected void onStompDisconnect(StompFrame command) throws ProtocolException {
-		checkConnected();
-		sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
-		connected.set(false);
-	}
-
-
-	protected void checkConnected() throws ProtocolException {
-		if(!connected.get()) {
-			throw new ProtocolException("Not connected.");
-		}
-	}
+            }
+        });
+    }
 
-	/**
+    protected void onStompDisconnect(StompFrame command) throws ProtocolException {
+        checkConnected();
+        sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
+        connected.set(false);
+    }
+
+    protected void checkConnected() throws ProtocolException {
+        if (!connected.get()) {
+            throw new ProtocolException("Not connected.");
+        }
+    }
+
+    /**
      * Dispatch a ActiveMQ command
+     * 
      * @param command
      * @throws IOException
      */
-	public void onActiveMQCommad( Command command ) throws IOException, JMSException {
+    public void onActiveMQCommad(Command command) throws IOException, JMSException {
 
-    	if ( command.isResponse() ) {
+        if (command.isResponse()) {
 
-			Response response = (Response) command;
-		    ResponseHandler rh = (ResponseHandler) resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
-		    if( rh !=null ) {
-		    	rh.onResponse(this, response);
-		    }
-
-		} else if( command.isMessageDispatch() ) {
-
-		    MessageDispatch md = (MessageDispatch)command;
-		    StompSubscription sub = (StompSubscription) subscriptionsByConsumerId.get(md.getConsumerId());
-		    if (sub != null) {
-		        sub.onMessageDispatch(md);
+            Response response = (Response)command;
+            ResponseHandler rh = (ResponseHandler)resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
+            if (rh != null) {
+                rh.onResponse(this, response);
+            }
+
+        } else if (command.isMessageDispatch()) {
+
+            MessageDispatch md = (MessageDispatch)command;
+            StompSubscription sub = (StompSubscription)subscriptionsByConsumerId.get(md.getConsumerId());
+            if (sub != null) {
+                sub.onMessageDispatch(md);
             }
         }
-	}
+    }
 
-    public  ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
+    public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
         ActiveMQMessage msg = frameTranslator.convertFrame(command);
         return msg;
     }
 
-	public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException {
-		return frameTranslator.convertMessage(message);
+    public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException {
+        return frameTranslator.convertMessage(message);
     }
 
-	public StompTransportFilter getTransportFilter() {
-		return transportFilter;
-	}
+    public StompTransportFilter getTransportFilter() {
+        return transportFilter;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolException.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolException.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolException.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolException.java Wed Aug  8 11:56:59 2007
@@ -19,32 +19,34 @@
 import java.io.IOException;
 
 /**
- * 
- * @author <a href="http://hiramchirino.com">chirino</a> 
+ * @author <a href="http://hiramchirino.com">chirino</a>
  */
 public class ProtocolException extends IOException {
 
-	private static final long serialVersionUID = -2869735532997332242L;
-	
-	private final boolean fatal;
-
-	public ProtocolException() {
-		this(null);
-	}
-	public ProtocolException(String s) {
-		this(s, false);
-	}
-	public ProtocolException(String s, boolean fatal) {
-		this(s,fatal, null);
-	}
-	public ProtocolException(String s, boolean fatal, Throwable cause) {
-		super(s);
-		this.fatal = fatal;
-		initCause(cause);
-	}
-	
-	public boolean isFatal() {
-		return fatal;
-	}
+    private static final long serialVersionUID = -2869735532997332242L;
+
+    private final boolean fatal;
+
+    public ProtocolException() {
+        this(null);
+    }
+
+    public ProtocolException(String s) {
+        this(s, false);
+    }
+
+    public ProtocolException(String s, boolean fatal) {
+        this(s, fatal, null);
+    }
+
+    public ProtocolException(String s, boolean fatal, Throwable cause) {
+        super(s);
+        this.fatal = fatal;
+        initCause(cause);
+    }
+
+    public boolean isFatal() {
+        return fatal;
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java Wed Aug  8 11:56:59 2007
@@ -30,144 +30,144 @@
 /**
  * Represents all the data in a STOMP frame.
  * 
- * @author <a href="http://hiramchirino.com">chirino</a> 
+ * @author <a href="http://hiramchirino.com">chirino</a>
  */
 public class StompFrame implements Command {
 
-    private static final byte[] NO_DATA = new byte[]{};
+    private static final byte[] NO_DATA = new byte[] {};
 
-	private String action;
-	private Map headers = Collections.EMPTY_MAP;
-	private byte[] content = NO_DATA;
-
-	public StompFrame(String command, HashMap headers, byte[] data) {
-		this.action = command;
-		this.headers = headers;
-		this.content = data;
-	}
-
-	public StompFrame() {
-	}
-
-	public String getAction() {
-		return action;
-	}
-
-	public void setAction(String command) {
-		this.action = command;
-	}
-
-	public byte[] getContent() {
-		return content;
-	}
-
-	public void setContent(byte[] data) {
-		this.content = data;
-	}
-
-	public Map getHeaders() {
-		return headers;
-	}
-
-	public void setHeaders(Map headers) {
-		this.headers = headers;
-	}
-
-	//
-	// Methods in the Command interface
-	//
-	public int getCommandId() {
-		return 0;
-	}
-
-	public Endpoint getFrom() {
-		return null;
-	}
-
-	public Endpoint getTo() {
-		return null;
-	}
-
-	public boolean isBrokerInfo() {
-		return false;
-	}
-
-	public boolean isMessage() {
-		return false;
-	}
-
-	public boolean isMessageAck() {
-		return false;
-	}
-
-	public boolean isMessageDispatch() {
-		return false;
-	}
-
-	public boolean isMessageDispatchNotification() {
-		return false;
-	}
-
-	public boolean isResponse() {
-		return false;
-	}
-
-	public boolean isResponseRequired() {
-		return false;
-	}
-
-	public boolean isShutdownInfo() {
-		return false;
-	}
-
-	public boolean isWireFormatInfo() {
-		return false;
-	}
-
-	public void setCommandId(int value) {
-	}
-
-	public void setFrom(Endpoint from) {
-	}
-
-	public void setResponseRequired(boolean responseRequired) {
-	}
-
-	public void setTo(Endpoint to) {
-	}
-
-	public Response visit(CommandVisitor visitor) throws Exception {
-		return null;
-	}
-
-	public byte getDataStructureType() {
-		return 0;
-	}
-
-	public boolean isMarshallAware() {
-		return false;
-	}
-
-	public String toString() {
-		StringBuffer buffer = new StringBuffer();
-		buffer.append(getAction());
-		buffer.append("\n");
-		Map headers = getHeaders();
-		for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) {
-			Map.Entry entry = (Map.Entry) iter.next();
-			buffer.append(entry.getKey());
-			buffer.append(":");
-			buffer.append(entry.getValue());
-			buffer.append("\n");
-		}
-		buffer.append("\n");
-		if( getContent()!=null ) {
-			try {
-				buffer.append(new String(getContent()));
-			} catch (Throwable e) {
-				buffer.append(Arrays.toString(getContent()));
-			}
-		}
-		return buffer.toString();
-	}
+    private String action;
+    private Map headers = Collections.EMPTY_MAP;
+    private byte[] content = NO_DATA;
+
+    public StompFrame(String command, HashMap headers, byte[] data) {
+        this.action = command;
+        this.headers = headers;
+        this.content = data;
+    }
+
+    public StompFrame() {
+    }
+
+    public String getAction() {
+        return action;
+    }
+
+    public void setAction(String command) {
+        this.action = command;
+    }
+
+    public byte[] getContent() {
+        return content;
+    }
+
+    public void setContent(byte[] data) {
+        this.content = data;
+    }
+
+    public Map getHeaders() {
+        return headers;
+    }
+
+    public void setHeaders(Map headers) {
+        this.headers = headers;
+    }
+
+    //
+    // Methods in the Command interface
+    //
+    public int getCommandId() {
+        return 0;
+    }
+
+    public Endpoint getFrom() {
+        return null;
+    }
+
+    public Endpoint getTo() {
+        return null;
+    }
+
+    public boolean isBrokerInfo() {
+        return false;
+    }
+
+    public boolean isMessage() {
+        return false;
+    }
+
+    public boolean isMessageAck() {
+        return false;
+    }
+
+    public boolean isMessageDispatch() {
+        return false;
+    }
+
+    public boolean isMessageDispatchNotification() {
+        return false;
+    }
+
+    public boolean isResponse() {
+        return false;
+    }
+
+    public boolean isResponseRequired() {
+        return false;
+    }
+
+    public boolean isShutdownInfo() {
+        return false;
+    }
+
+    public boolean isWireFormatInfo() {
+        return false;
+    }
+
+    public void setCommandId(int value) {
+    }
+
+    public void setFrom(Endpoint from) {
+    }
+
+    public void setResponseRequired(boolean responseRequired) {
+    }
+
+    public void setTo(Endpoint to) {
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return null;
+    }
+
+    public byte getDataStructureType() {
+        return 0;
+    }
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    public String toString() {
+        StringBuffer buffer = new StringBuffer();
+        buffer.append(getAction());
+        buffer.append("\n");
+        Map headers = getHeaders();
+        for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) {
+            Map.Entry entry = (Map.Entry)iter.next();
+            buffer.append(entry.getKey());
+            buffer.append(":");
+            buffer.append(entry.getValue());
+            buffer.append("\n");
+        }
+        buffer.append("\n");
+        if (getContent() != null) {
+            try {
+                buffer.append(new String(getContent()));
+            } catch (Throwable e) {
+                buffer.append(Arrays.toString(getContent()));
+            }
+        }
+        return buffer.toString();
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrameError.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrameError.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrameError.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrameError.java Wed Aug  8 11:56:59 2007
@@ -19,20 +19,18 @@
 /**
  * Command indicating that an invalid Stomp Frame was received.
  * 
- * @author <a href="http://hiramchirino.com">chirino</a> 
+ * @author <a href="http://hiramchirino.com">chirino</a>
  */
 public class StompFrameError extends StompFrame {
 
+    private final ProtocolException exception;
 
-	private final ProtocolException exception;
-
-	public StompFrameError(ProtocolException exception) {
-		this.exception = exception;
-	}
-
-	public ProtocolException getException() {
-		return exception;
-	}
+    public StompFrameError(ProtocolException exception) {
+        this.exception = exception;
+    }
 
+    public ProtocolException getException() {
+        return exception;
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java Wed Aug  8 11:56:59 2007
@@ -33,110 +33,109 @@
 
 /**
  * Keeps track of the STOMP susbscription so that acking is correctly done.
- *  
- * @author <a href="http://hiramchirino.com">chirino</a> 
+ * 
+ * @author <a href="http://hiramchirino.com">chirino</a>
  */
 public class StompSubscription {
-    
+
     public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
     public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
 
-	private final ProtocolConverter protocolConverter;
+    private final ProtocolConverter protocolConverter;
     private final String subscriptionId;
     private final ConsumerInfo consumerInfo;
-    
+
     private final LinkedHashMap dispatchedMessage = new LinkedHashMap();
-    
+
     private String ackMode = AUTO_ACK;
-	private ActiveMQDestination destination;
+    private ActiveMQDestination destination;
 
-    
     public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo) {
         this.protocolConverter = stompTransport;
-		this.subscriptionId = subscriptionId;
+        this.subscriptionId = subscriptionId;
         this.consumerInfo = consumerInfo;
     }
 
     void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
 
-    	ActiveMQMessage message = (ActiveMQMessage) md.getMessage();
-    	
+        ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
+
         if (ackMode == CLIENT_ACK) {
             synchronized (this) {
-            	dispatchedMessage.put(message.getJMSMessageID(), message.getMessageId());
+                dispatchedMessage.put(message.getJMSMessageID(), message.getMessageId());
             }
         } else if (ackMode == AUTO_ACK) {
             MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
             protocolConverter.getTransportFilter().sendToActiveMQ(ack);
         }
-        
+
         StompFrame command = protocolConverter.convertMessage(message);
-        
-        command.setAction(Stomp.Responses.MESSAGE);        
-        if (subscriptionId!=null) {
+
+        command.setAction(Stomp.Responses.MESSAGE);
+        if (subscriptionId != null) {
             command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
         }
-        
+
         protocolConverter.getTransportFilter().sendToStomp(command);
     }
-    
+
     synchronized MessageAck onStompMessageAck(String messageId) {
-    	
-		if( !dispatchedMessage.containsKey(messageId) ) {
-			return null;
-		}
-    	
+
+        if (!dispatchedMessage.containsKey(messageId)) {
+            return null;
+        }
+
         MessageAck ack = new MessageAck();
         ack.setDestination(consumerInfo.getDestination());
         ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
         ack.setConsumerId(consumerInfo.getConsumerId());
-        
-        int count=0;
+
+        int count = 0;
         for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
-            
-        	Map.Entry entry = (Entry) iter.next();
-            String id = (String) entry.getKey();
-            MessageId msgid = (MessageId) entry.getValue();
-            
-            if( ack.getFirstMessageId()==null )
+
+            Map.Entry entry = (Entry)iter.next();
+            String id = (String)entry.getKey();
+            MessageId msgid = (MessageId)entry.getValue();
+
+            if (ack.getFirstMessageId() == null)
                 ack.setFirstMessageId(msgid);
 
             iter.remove();
             count++;
 
-            if( id.equals(messageId)  ) {
+            if (id.equals(messageId)) {
                 ack.setLastMessageId(msgid);
                 break;
             }
-            
+
         }
-        
+
         ack.setMessageCount(count);
         return ack;
     }
 
-	public String getAckMode() {
-		return ackMode;
-	}
-
-	public void setAckMode(String ackMode) {
-		this.ackMode = ackMode;
-	}
-
-	public String getSubscriptionId() {
-		return subscriptionId;
-	}
-
-	public void setDestination(ActiveMQDestination destination) {
-		this.destination = destination;
-	}
-
-	public ActiveMQDestination getDestination() {
-		return destination;
-	}
-
-	public ConsumerInfo getConsumerInfo() {
-		return consumerInfo;
-	}
+    public String getAckMode() {
+        return ackMode;
+    }
+
+    public void setAckMode(String ackMode) {
+        this.ackMode = ackMode;
+    }
+
+    public String getSubscriptionId() {
+        return subscriptionId;
+    }
+
+    public void setDestination(ActiveMQDestination destination) {
+        this.destination = destination;
+    }
+
+    public ActiveMQDestination getDestination() {
+        return destination;
+    }
+
+    public ConsumerInfo getConsumerInfo() {
+        return consumerInfo;
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java Wed Aug  8 11:56:59 2007
@@ -28,16 +28,15 @@
 import org.apache.commons.logging.LogFactory;
 
 /**
- * The StompTransportFilter normally sits on top of a TcpTransport
- * that has been configured with the StompWireFormat and is used to
- * convert STOMP commands to ActiveMQ commands.
- *
- * All of the coversion work is done by delegating to the ProtocolConverter.
- *
+ * The StompTransportFilter normally sits on top of a TcpTransport that has been
+ * configured with the StompWireFormat and is used to convert STOMP commands to
+ * ActiveMQ commands. All of the coversion work is done by delegating to the
+ * ProtocolConverter.
+ * 
  * @author <a href="http://hiramchirino.com">chirino</a>
  */
 public class StompTransportFilter extends TransportFilter {
-	static final private Log log = LogFactory.getLog(StompTransportFilter.class);
+    static final private Log log = LogFactory.getLog(StompTransportFilter.class);
     private final ProtocolConverter protocolConverter;
 
     private final Object sendToActiveMQMutex = new Object();
@@ -46,60 +45,59 @@
     private final FrameTranslator frameTranslator;
 
     private boolean trace;
-    
+
     public StompTransportFilter(Transport next, FrameTranslator translator) {
-		super(next);
+        super(next);
         this.frameTranslator = translator;
         this.protocolConverter = new ProtocolConverter(this, translator);
     }
 
-	public void oneway(Object o) throws IOException {
+    public void oneway(Object o) throws IOException {
         try {
-        	final Command command = (Command) o;
-        	protocolConverter.onActiveMQCommad(command);
-		} catch (JMSException e) {
-			throw IOExceptionSupport.create(e);
-		}
-	}
+            final Command command = (Command)o;
+            protocolConverter.onActiveMQCommad(command);
+        } catch (JMSException e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
 
-	public void onCommand(Object command) {
+    public void onCommand(Object command) {
         try {
-    		if( trace ) {
-    			log.trace("Received: \n"+command);
-    		}
-        	protocolConverter.onStompCommad((StompFrame) command);
-		} catch (IOException e) {
-			onException(e);
-		} catch (JMSException e) {
-			onException(IOExceptionSupport.create(e));
-		}
-	}
-
-	public void sendToActiveMQ(Command command) {
-		synchronized(sendToActiveMQMutex) {
-			transportListener.onCommand(command);
-		}
-	}
-
-	public void sendToStomp(StompFrame command) throws IOException {
-		if( trace ) {
-			log.trace("Sending: \n"+command);
-		}
-		synchronized(sendToStompMutex) {
-			next.oneway(command);
-		}
-	}
+            if (trace) {
+                log.trace("Received: \n" + command);
+            }
+            protocolConverter.onStompCommad((StompFrame)command);
+        } catch (IOException e) {
+            onException(e);
+        } catch (JMSException e) {
+            onException(IOExceptionSupport.create(e));
+        }
+    }
+
+    public void sendToActiveMQ(Command command) {
+        synchronized (sendToActiveMQMutex) {
+            transportListener.onCommand(command);
+        }
+    }
 
-    public FrameTranslator getFrameTranslator()
-    {
+    public void sendToStomp(StompFrame command) throws IOException {
+        if (trace) {
+            log.trace("Sending: \n" + command);
+        }
+        synchronized (sendToStompMutex) {
+            next.oneway(command);
+        }
+    }
+
+    public FrameTranslator getFrameTranslator() {
         return frameTranslator;
     }
 
-	public boolean isTrace() {
-		return trace;
-	}
-
-	public void setTrace(boolean trace) {
-		this.trace = trace;
-	}
+    public boolean isTrace() {
+        return trace;
+    }
+
+    public void setTrace(boolean trace) {
+        this.trace = trace;
+    }
 }



Mime
View raw message