activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r515863 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/command/ main/java/org/apache/activemq/openwire/ test/java/org/apache/activemq/transport/tcp/
Date Thu, 08 Mar 2007 00:22:19 GMT
Author: chirino
Date: Wed Mar  7 16:22:18 2007
New Revision: 515863

URL: http://svn.apache.org/viewvc?view=rev&rev=515863
Log:
Expose the wire format protocol to the ActiveMQConnection and TransportConnection objects
so that they know when then can use more advanced protocol options.  This will be needed
to implement producer flow control acking.


Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?view=diff&rev=515863&r1=515862&r2=515863
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Wed Mar  7 16:22:18 2007
@@ -25,6 +25,15 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionConsumer;
@@ -45,6 +54,7 @@
 import javax.jms.TopicSession;
 import javax.jms.XAConnection;
 
+import org.apache.activemq.blob.BlobTransferPolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTempDestination;
@@ -52,6 +62,7 @@
 import org.apache.activemq.command.ActiveMQTempTopic;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
+import org.apache.activemq.command.CommandTypes;
 import org.apache.activemq.command.ConnectionControl;
 import org.apache.activemq.command.ConnectionError;
 import org.apache.activemq.command.ConnectionId;
@@ -70,6 +81,7 @@
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionId;
 import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.management.JMSConnectionStatsImpl;
 import org.apache.activemq.management.JMSStatsImpl;
 import org.apache.activemq.management.StatsCapable;
@@ -82,19 +94,9 @@
 import org.apache.activemq.util.JMSExceptionSupport;
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.apache.activemq.util.ServiceSupport;
-import org.apache.activemq.blob.BlobTransferPolicy;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 
 public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection,
StatsCapable, Closeable,  StreamConnection, TransportListener {
 
@@ -166,7 +168,10 @@
     private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
     private BrokerInfo brokerInfo;
     private IOException firstFailureError;
-
+    
+    // Assume that protocol is the latest.  Change to the actual protocol
+    // version when a WireFormatInfo is received.
+    private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
 
     /**
      * Construct an <code>ActiveMQConnection</code>
@@ -1562,6 +1567,8 @@
                 onConnectionControl((ConnectionControl) command);
             }else if (command instanceof ConsumerControl){
                 onConsumerControl((ConsumerControl) command);
+            }else if ( command.isWireFormatInfo() ) {
+            	onWireFormatInfo((WireFormatInfo)command);
             }
         }
         for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
@@ -1570,7 +1577,12 @@
         }
     }
 
-    /**
+    protected void onWireFormatInfo(WireFormatInfo info) {
+    	protocolVersion.set(info.getVersion());
+	}
+
+
+	/**
      * Used for handling async exceptions
      * 
      * @param error
@@ -1989,4 +2001,9 @@
     protected BlobTransferPolicy createBlobTransferPolicy() {
         return new BlobTransferPolicy();
     }
+
+
+	public int getProtocolVersion() {
+		return protocolVersion.get();
+	}
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=515863&r1=515862&r2=515863
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Wed Mar  7 16:22:18 2007
@@ -26,6 +26,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.activemq.Service;
@@ -34,6 +35,7 @@
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
+import org.apache.activemq.command.CommandTypes;
 import org.apache.activemq.command.ConnectionControl;
 import org.apache.activemq.command.ConnectionError;
 import org.apache.activemq.command.ConnectionId;
@@ -123,6 +125,7 @@
     private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
     protected AtomicBoolean dispatchStopped=new AtomicBoolean(false);
     private boolean networkConnection;
+    private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
     
     static class ConnectionState extends org.apache.activemq.state.ConnectionState{
 
@@ -326,6 +329,7 @@
 
     public Response processWireFormat(WireFormatInfo info) throws Exception{
         wireFormatInfo=info;
+    	protocolVersion.set(info.getVersion());
         return null;
     }
 
@@ -1157,6 +1161,10 @@
 			log.debug("Could not stop transport: "+e,e);
 		}
     	}
+	}
+	
+	public int getProtocolVersion() {
+		return protocolVersion.get();
 	}
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java?view=diff&rev=515863&r1=515862&r2=515863
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java
Wed Mar  7 16:22:18 2007
@@ -23,6 +23,9 @@
  * @version $Revision: 1.21 $
  */
 public interface CommandTypes {
+	
+	// What is the latest version of the openwire protocol
+    byte  PROTOCOL_VERSION                  = 3;
 
     // A marshaling layer can use this type to specify a null object.
     byte  NULL                              = 0;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java?view=diff&rev=515863&r1=515862&r2=515863
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
Wed Mar  7 16:22:18 2007
@@ -17,26 +17,31 @@
  */
 package org.apache.activemq.openwire;
 
-import org.apache.activemq.command.CommandTypes;
-import org.apache.activemq.command.DataStructure;
-import org.apache.activemq.command.MarshallAware;
-import org.apache.activemq.command.WireFormatInfo;
-import org.apache.activemq.util.*;
-import org.apache.activemq.wireformat.WireFormat;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.HashMap;
 
+import org.apache.activemq.command.CommandTypes;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.MarshallAware;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.ByteSequenceData;
+import org.apache.activemq.util.ClassLoading;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.wireformat.WireFormat;
+
 /**
  * 
  * @version $Revision$
  */
 final public class OpenWireFormat implements WireFormat {
 
-    public static final int DEFAULT_VERSION = 3;
+    public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_VERSION;
 
     static final byte NULL_TYPE = CommandTypes.NULL;
     private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE/2;
@@ -561,15 +566,28 @@
 			throw new IllegalStateException("Wireformat cannot not be renegotiated.");
 		
 		this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()) );
+		info.setVersion(this.getVersion());
+		
 		this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled();
+		info.setStackTraceEnabled(this.stackTraceEnabled);
+		
 		this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled();
+		info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled);
+		
 		this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled();
+		info.setCacheEnabled(this.cacheEnabled);
+		
 		this.tightEncodingEnabled = info.isTightEncodingEnabled() && preferedWireFormatInfo.isTightEncodingEnabled();
+		info.setTightEncodingEnabled(this.tightEncodingEnabled);
+		
 		this.sizePrefixDisabled = info.isSizePrefixDisabled() && preferedWireFormatInfo.isSizePrefixDisabled();
+		info.setSizePrefixDisabled(this.sizePrefixDisabled);
 		
 		if( cacheEnabled ) {
 			
 			int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize());
+			info.setCacheSize(size);
+			
 			if( size == 0 ) {
 				size = MARSHAL_CACHE_SIZE;
 			}

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java?view=auto&rev=515863
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java
Wed Mar  7 16:22:18 2007
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.command.CommandTypes;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.TransportServer;
+
+import javax.net.SocketFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+public class WireformatNegociationTest extends CombinationTestSupport {
+    
+    private TransportServer server;
+    private Transport clientTransport;
+    private Transport serverTransport;
+    
+    private final AtomicReference<WireFormatInfo> clientWF = new AtomicReference<WireFormatInfo>();
+    private final AtomicReference<WireFormatInfo> serverWF = new AtomicReference<WireFormatInfo>();
+    private final AtomicReference<Exception> asyncError = new AtomicReference<Exception>();
+    private final AtomicBoolean ignoreAsycError = new AtomicBoolean();
+    
+    private final CountDownLatch negociationCounter = new CountDownLatch(2);
+            
+    protected void setUp() throws Exception {
+        super.setUp();
+    }
+
+    /**
+     * @throws Exception
+     * @throws URISyntaxException
+     */
+    private void startClient(String uri) throws Exception, URISyntaxException {
+        clientTransport = TransportFactory.connect(new URI(uri));
+        clientTransport.setTransportListener(new TransportListener() {
+            public void onCommand(Object command) {
+            	if( command instanceof WireFormatInfo ) {
+            		clientWF.set((WireFormatInfo) command);
+            		negociationCounter.countDown();
+            	}
+            }
+            public void onException(IOException error) {
+                if( !ignoreAsycError.get() ) {
+                    log.info("Client transport error: ", error);
+                    asyncError.set(error);
+            		negociationCounter.countDown();
+                }
+            }
+            public void transportInterupted() {
+            }
+            public void transportResumed() {
+            }});
+        clientTransport.start();
+    }
+
+    /**
+     * @throws IOException
+     * @throws URISyntaxException
+     * @throws Exception
+     */
+    private void startServer(String uri ) throws IOException, URISyntaxException, Exception
{
+        server = TransportFactory.bind("localhost", new URI(uri));
+        server.setAcceptListener(new TransportAcceptListener(){
+            public void onAccept(Transport transport) {
+                try {
+                    log.info("["+getName()+"] Server Accepted a Connection");
+                    serverTransport = transport;
+                    serverTransport.setTransportListener(new TransportListener() {
+                        public void onCommand(Object command) {
+                        	if( command instanceof WireFormatInfo ) {
+                        		serverWF.set((WireFormatInfo) command);
+                        		negociationCounter.countDown();
+                        	}
+                        }
+                        public void onException(IOException error) {
+                            if( !ignoreAsycError.get() ) {
+                                log.info("Server transport error: ", error);
+                                asyncError.set(error);
+                        		negociationCounter.countDown();
+                            }
+                        }
+                        public void transportInterupted() {
+                        }
+                        public void transportResumed() {
+                        }});
+                    serverTransport.start();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+
+            public void onAcceptError(Exception error) {
+                error.printStackTrace();
+            }
+        });
+        server.start();
+    }
+    
+    protected void tearDown() throws Exception {
+    	ignoreAsycError.set(true);
+        try {
+            if( clientTransport!=null )
+                clientTransport.stop();
+            if( serverTransport!=null )
+                serverTransport.stop();
+            if( server!=null )
+                server.stop();
+        } catch (Throwable e) {
+            e.printStackTrace();
+        }
+        super.tearDown();
+    }
+    
+    
+    /**
+     * @throws Exception
+     */
+    public void testWireFomatInfoSeverVersion1() throws Exception {
+        
+    	startServer("tcp://localhost:61616?wireFormat.version=1");
+        startClient("tcp://localhost:61616");
+        
+        assertTrue("Connect timeout", negociationCounter.await(10, TimeUnit.SECONDS));
+        assertNull("Async error: "+asyncError, asyncError.get());
+        
+        assertNotNull(clientWF.get());
+        assertEquals(1, clientWF.get().getVersion());
+        
+        assertNotNull(serverWF.get());
+        assertEquals(1, serverWF.get().getVersion());
+    }
+    
+    /**
+     * @throws Exception
+     */
+    public void testWireFomatInfoClientVersion1() throws Exception {
+        
+    	startServer("tcp://localhost:61616");
+        startClient("tcp://localhost:61616?wireFormat.version=1");
+        
+        assertTrue("Connect timeout", negociationCounter.await(10, TimeUnit.SECONDS));
+        assertNull("Async error: "+asyncError, asyncError.get());
+        
+        assertNotNull(clientWF.get());
+        assertEquals(1, clientWF.get().getVersion());
+        
+        assertNotNull(serverWF.get());
+        assertEquals(1, serverWF.get().getVersion());
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testWireFomatInfoCurrentVersion() throws Exception {
+        
+    	startServer("tcp://localhost:61616");
+        startClient("tcp://localhost:61616");
+        
+        assertTrue("Connect timeout", negociationCounter.await(10, TimeUnit.SECONDS));
+        assertNull("Async error: "+asyncError, asyncError.get());
+        
+        assertNotNull(clientWF.get());
+        assertEquals(CommandTypes.PROTOCOL_VERSION, clientWF.get().getVersion());
+        
+        assertNotNull(serverWF.get());
+        assertEquals(CommandTypes.PROTOCOL_VERSION, serverWF.get().getVersion());
+    }
+
+}



Mime
View raw message