activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r463859 - in /incubator/activemq/sandbox/qpid/src: main/java/org/apache/activemq/qpid/transport/ main/java/org/apache/activemq/qpid/wireformat/ test/ test/resources/
Date Fri, 13 Oct 2006 22:34:03 GMT
Author: chirino
Date: Fri Oct 13 15:34:00 2006
New Revision: 463859

URL: http://svn.apache.org/viewvc?view=rev&rev=463859
Log:
Adding last needed bits to make the QpidWireFormat work.  also added in a simple Proxy main
class that sets up a qpid proxy that uses the qpid wireformat.  I ran this against the qpid
sever and some examples clients and thats how I verified that the wireformat is now working

Added:
    incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/qpid/transport/Proxy.java
    incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/qpid/wireformat/ProtocolInitiation.java
    incubator/activemq/sandbox/qpid/src/test/
    incubator/activemq/sandbox/qpid/src/test/resources/
    incubator/activemq/sandbox/qpid/src/test/resources/log4j.properties   (with props)
Modified:
    incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/qpid/transport/QpidTransportFactory.java
    incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/qpid/wireformat/QpidWireFormat.java

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/qpid/transport/Proxy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/qpid/transport/Proxy.java?view=auto&rev=463859
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/qpid/transport/Proxy.java
(added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/qpid/transport/Proxy.java
Fri Oct 13 15:34:00 2006
@@ -0,0 +1,64 @@
+/**
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.activemq.qpid.transport;
+
+import java.net.URI;
+
+import org.apache.activemq.proxy.ProxyConnector;
+
+/**
+ * Creates a QPID proxy server that accepts connecentions on qpid://localhost:5673
+ * and forwords them to qpid://localhost:5672 by default.  You can use this with
+ * a existing qpid client and sever to verify that the qpid protocol implementation
+ * in ActiveMQ can understand all commands between the client and server.
+ * 
+ * Usage Syntax:
+ *   
+ *   Main [bind-url [proxy-url]]
+ *   
+ * Usage Example:
+ *   
+ *   Main qpid://localhost:5673 qpid://localhost:5672
+ * 
+ * @author chirino
+ */
+public class Proxy {
+	public static void main(String[] args) throws Exception {
+				
+		String bind = "qpid://localhost:5673";
+		String proxy = "qpid://localhost:5672";
+		if( args.length > 0 ) {
+			bind = args[0];
+		}
+		if( args.length > 1 ) {
+			proxy = args[1];
+		}
+		
+		System.out.println("Proxying qpid connections from "+bind+" -> "+proxy);
+		
+		ProxyConnector connector = new ProxyConnector();
+		connector.setBind(new URI(bind));
+		connector.setRemote(new URI(proxy));
+		connector.start();
+		
+		synchronized(Proxy.class) {
+			Proxy.class.wait();
+		}
+		
+	}
+}

Modified: incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/qpid/transport/QpidTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/qpid/transport/QpidTransportFactory.java?view=diff&rev=463859&r1=463858&r2=463859
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/qpid/transport/QpidTransportFactory.java
(original)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/qpid/transport/QpidTransportFactory.java
Fri Oct 13 15:34:00 2006
@@ -17,9 +17,17 @@
  */
 package org.apache.activemq.qpid.transport;
 
+import java.io.IOException;
+import java.net.URI;
+import java.net.UnknownHostException;
 import java.util.Map;
 
+import javax.net.SocketFactory;
+
+import org.apache.activemq.qpid.wireformat.QpidWireFormat;
+import org.apache.activemq.transport.MutexTransport;
 import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransport;
 import org.apache.activemq.transport.tcp.TcpTransportFactory;
 import org.apache.activemq.wireformat.WireFormat;
 
@@ -32,6 +40,20 @@
 
     protected String getDefaultWireFormatType() {
         return "qpid";
+    }
+
+    
+    public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception
{
+    	transport = compositeConfigure(transport, wf, options);
+        transport = new MutexTransport(transport);
+        // transport = new ResponseCorrelator(transport);
+        return transport;
+    }
+
+    protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory,
URI location, URI localLocation) throws UnknownHostException, IOException {
+    	((QpidWireFormat)wf).setSendProtocolInitiation(true);
+    	((QpidWireFormat)wf).setWaitForProtocolInitiation(false);    	
+    	return super.createTcpTransport(wf, socketFactory, location, localLocation);
     }
 
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options)
{

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/qpid/wireformat/ProtocolInitiation.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/qpid/wireformat/ProtocolInitiation.java?view=auto&rev=463859
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/qpid/wireformat/ProtocolInitiation.java
(added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/qpid/wireformat/ProtocolInitiation.java
Fri Oct 13 15:34:00 2006
@@ -0,0 +1,94 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.activemq.qpid.wireformat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.emory.mathcs.backport.java.util.Arrays;
+
+public class ProtocolInitiation 
+{
+    static public byte[] MAGIC = new byte[]{'A','M','Q','P'};
+
+    private static byte CURRENT_PROTOCOL_CLASS = 1;
+    private static final int CURRENT_PROTOCOL_INSTANCE = 1;
+
+    public byte protocolClass = CURRENT_PROTOCOL_CLASS;
+    public byte protocolInstance = CURRENT_PROTOCOL_INSTANCE;
+    public byte protocolMajor;
+    public byte protocolMinor;
+
+    public ProtocolInitiation() {}
+    public ProtocolInitiation(int major, int minor)
+    {
+        protocolMajor = (byte) major;
+        protocolMinor = (byte) minor;
+    }
+
+    public void marshall(DataOutput buffer) throws IOException
+    {
+        buffer.write(MAGIC);
+        
+        buffer.writeByte(protocolClass);
+        buffer.writeByte(protocolInstance);
+        buffer.writeByte(protocolMajor);
+        buffer.writeByte(protocolMinor);
+    }
+
+    public void unmarshall(DataInput buffer) throws IOException
+    {
+    	byte []magic = new byte[MAGIC.length];
+    	buffer.readFully(magic);
+    	if( !Arrays.equals(magic, MAGIC) ) {
+    		throw new IOException("Invalid protocol header.");
+    	}
+    	
+    	protocolClass = buffer.readByte();
+    	protocolInstance = buffer.readByte();
+    	protocolMajor = buffer.readByte();
+    	protocolMinor = buffer.readByte();    	
+    }
+    
+	public byte getProtocolClass() {
+		return protocolClass;
+	}
+	public void setProtocolClass(byte protocolClass) {
+		this.protocolClass = protocolClass;
+	}
+	public byte getProtocolInstance() {
+		return protocolInstance;
+	}
+	public void setProtocolInstance(byte protocolInstance) {
+		this.protocolInstance = protocolInstance;
+	}
+	public byte getProtocolMajor() {
+		return protocolMajor;
+	}
+	public void setProtocolMajor(byte protocolMajor) {
+		this.protocolMajor = protocolMajor;
+	}
+	public byte getProtocolMinor() {
+		return protocolMinor;
+	}
+	public void setProtocolMinor(byte protocolMinor) {
+		this.protocolMinor = protocolMinor;
+	}
+
+}

Modified: incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/qpid/wireformat/QpidWireFormat.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/qpid/wireformat/QpidWireFormat.java?view=diff&rev=463859&r1=463858&r2=463859
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/qpid/wireformat/QpidWireFormat.java
(original)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/qpid/wireformat/QpidWireFormat.java
Fri Oct 13 15:34:00 2006
@@ -40,7 +40,12 @@
 public class QpidWireFormat implements WireFormat {
 	
 	private int version=8;
+	private int minorVersion=0;
+	
 	private final HashMap methodBodyMarshallers = new HashMap();
+	
+	private boolean sendProtocolInitiation = false;
+	private boolean waitForProtocolInitiation = true;
 
 	public int getVersion() {
 		return version;
@@ -71,10 +76,17 @@
 	/////////////////////////////////////////////////////////////////////////
 	public Object unmarshal(DataInputStream buffer) throws IOException {
 		
+		if( waitForProtocolInitiation ) {
+			ProtocolInitiation initiation = new ProtocolInitiation();
+			initiation.unmarshall(buffer);
+			waitForProtocolInitiation=false;
+			return initiation;
+		}
+
 		Frame rc=null;
         final byte type = buffer.readByte();
         int channel = EncodingUtils.readUnsignedShort(buffer); 
-        final long bodySize = EncodingUtils.readUnsignedInteger(buffer);
+        final long length = EncodingUtils.readUnsignedInteger(buffer);
         
         switch( type ) {
         case MethodBody.TYPE:        	
@@ -83,7 +95,7 @@
             
         	MethodBodyMarshaller marshaller = getAMQMethodBodyMarshaller(clazz, method);   
    	
         	MethodBody methodBody = marshaller.createBody();
-            marshaller.populateFromBuffer(buffer, bodySize, methodBody);
+            marshaller.populateFromBuffer(buffer, length, methodBody);
             methodBody.channel=channel;
             rc = methodBody;
             break;
@@ -108,9 +120,7 @@
         	        	
         case ContentBody.TYPE:
         	
-        	ContentBody contentBody = new ContentBody();
-        	
-            long length = EncodingUtils.readUnsignedInteger(buffer);
+        	ContentBody contentBody = new ContentBody();        	
             byte[] data = new byte [(int) length];            
             buffer.readFully(data);
             
@@ -139,7 +149,7 @@
         byte marker = buffer.readByte();
         if ((marker & 0xFF) != 0xCE)
         {
-            throw new IOException("End of frame marker not found. Read " + marker + " size="
+ bodySize + " type=" + type);
+            throw new IOException("End of frame marker not found. Read " + marker + " size="
+ length + " type=" + type);
         }
         return rc;
 	}
@@ -147,6 +157,17 @@
 	
 	public void marshal(Object object, DataOutputStream buffer) throws IOException {
 		
+		if( sendProtocolInitiation ) {
+			
+			ProtocolInitiation initiation = new ProtocolInitiation(getVersion(), getMinorVersion());
+			initiation.marshall(buffer);
+			sendProtocolInitiation=false;
+			
+			if( object instanceof ProtocolInitiation ) {
+				return;
+			}
+		}
+		
 		Frame frame = (Frame) object;
     	byte type = frame.getType();    	
         buffer.writeByte(type);
@@ -179,7 +200,6 @@
             break;
             
         case HeartbeatBody.TYPE:
-        	HeartbeatBody heartbeatBody = (HeartbeatBody) frame;
             EncodingUtils.writeUnsignedInteger(buffer, 0); // No payload
             break;
             
@@ -190,6 +210,24 @@
         
         buffer.writeByte((byte) 0xCE);
     }
+	public boolean isSendProtocolInitiation() {
+		return sendProtocolInitiation;
+	}
+	public void setSendProtocolInitiation(boolean sendProtocolInitiation) {
+		this.sendProtocolInitiation = sendProtocolInitiation;
+	}
+	public boolean isWaitForProtocolInitiation() {
+		return waitForProtocolInitiation;
+	}
+	public void setWaitForProtocolInitiation(boolean waitForProtocolInitiation) {
+		this.waitForProtocolInitiation = waitForProtocolInitiation;
+	}
+	public int getMinorVersion() {
+		return minorVersion;
+	}
+	public void setMinorVersion(int minorVersion) {
+		this.minorVersion = minorVersion;
+	}
 	
 	
 }

Added: incubator/activemq/sandbox/qpid/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/test/resources/log4j.properties?view=auto&rev=463859
==============================================================================
--- incubator/activemq/sandbox/qpid/src/test/resources/log4j.properties (added)
+++ incubator/activemq/sandbox/qpid/src/test/resources/log4j.properties Fri Oct 13 15:34:00
2006
@@ -0,0 +1,29 @@
+## ------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ------------------------------------------------------------------------
+
+#
+# The logging properties used for eclipse testing, We want to see debug output on the console.
+#
+log4j.rootLogger=WARN, out
+
+log4j.logger.org.apache.activemq=INFO
+
+# CONSOLE appender not used by default
+log4j.appender.out=org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
+#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n

Propchange: incubator/activemq/sandbox/qpid/src/test/resources/log4j.properties
------------------------------------------------------------------------------
    svn:executable = *



Mime
View raw message