activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r745938 - in /activemq/sandbox/activemq-flow/src/test: java/org/apache/activemq/flow/Proto2WireFormatFactory.java java/org/apache/activemq/flow/Router.java resources/META-INF/services/org/apache/activemq/wireformat/proto2
Date Thu, 19 Feb 2009 17:30:12 GMT
Author: chirino
Date: Thu Feb 19 17:30:11 2009
New Revision: 745938

URL: http://svn.apache.org/viewvc?rev=745938&view=rev
Log:
Added a proto2 wire which uses the new externalizable style encoding/decoding.  Just so we
can benchmark protobuf encoding vs. more standard java style externalizable encoding.

Added:
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
    activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2
Modified:
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java?rev=745938&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
(added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
Thu Feb 19 17:30:11 2009
@@ -0,0 +1,272 @@
+package org.apache.activemq.flow;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+
+import org.apache.activemq.flow.Commands.Destination.DestinationBean;
+import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBean;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
+import org.apache.activemq.flow.Commands.Message.MessageBean;
+import org.apache.activemq.flow.Commands.Message.MessageBuffer;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.StatefulWireFormat;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+public class Proto2WireFormatFactory implements WireFormatFactory {
+
+    public class TestWireFormat implements StatefulWireFormat {
+        private ByteBuffer currentOut;
+        private byte outType;
+        
+        private ByteBuffer currentIn;
+        private byte inType;
+        
+        public void marshal(Object value, DataOutput out) throws IOException {
+            if( value.getClass() == Message.class ) {
+                out.writeByte(0);
+                DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+                MessageBean proto = ((Message)value).getProto().copy();
+                proto.writeExternal(baos);
+                out.writeInt(baos.size());
+                out.write(baos.getData(), 0, baos.size());
+            } else if( value.getClass() == String.class ) {
+                out.writeByte(1);
+                String value2 = (String) value;
+                byte[] bytes = value2.getBytes("UTF-8");
+                out.writeInt(bytes.length);
+                out.write(bytes);
+            } else if( value.getClass() == DestinationBuffer.class ) {
+                out.writeByte(2);
+                DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+                DestinationBean proto = ((DestinationBuffer)value).copy();
+                proto.writeExternal(baos);
+                out.writeInt(baos.size());
+                out.write(baos.getData(), 0, baos.size());
+            }else if( value.getClass() == FlowControlBuffer.class ) {
+                out.writeByte(3);
+                DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+                FlowControlBean proto = ((FlowControlBuffer)value).copy();
+                proto.writeExternal(baos);
+                out.writeInt(baos.size());
+                out.write(baos.getData(), 0, baos.size());
+
+            } else {
+                throw new IOException("Unsupported type: "+value.getClass());
+            }
+        }
+
+        public Object unmarshal(DataInput in) throws IOException {
+            byte type = in.readByte();
+            int size = in.readInt();
+            switch(type) {
+                case 0: {
+                    MessageBean proto = new MessageBean();
+                    proto.readExternal(in);
+                    return new Message(proto.freeze());
+                }
+                case 1: {
+                    byte data[] = new byte[size];
+                    in.readFully(data);
+                    return new String(data, "UTF-8");
+                } case 2: {
+                    DestinationBean proto = new DestinationBean();
+                    proto.readExternal(in);
+                    return proto.freeze();
+                } case 3: {
+                    FlowControlBean proto = new FlowControlBean();
+                    proto.readExternal(in);
+                    return proto.freeze();
+                }
+                default:
+                    throw new IOException("Unknonw type byte: ");
+            }
+        }
+
+        public boolean marshal(Object value, ByteBuffer target) throws IOException
+        {
+            if(currentOut == null)
+            {
+                //Ensure room for type byte and length byte:
+                if(target.remaining() < 5)
+                {
+                    return false;
+                }
+                
+                if( value.getClass() == Message.class ) {
+                    DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+                    MessageBean proto = ((Message)value).getProto().copy();
+                    proto.writeExternal(baos);
+                	currentOut = ByteBuffer.wrap(baos.getData(), 0, baos.size());
+                	outType = 0;
+                } else if( value.getClass() == String.class ) {
+                	outType = 1;
+                    try {
+                        currentOut = ByteBuffer.wrap(((String)value).getBytes("utf-8"));
+                    } catch (UnsupportedEncodingException e) {
+                        //Shouldn't happen.
+                        throw IOExceptionSupport.create(e);
+                    }
+                } else if( value.getClass() == DestinationBuffer.class ) {
+                	outType = 2;
+                    
+                	DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+                	DestinationBean proto = ((DestinationBuffer)value).copy();
+                    proto.writeExternal(baos);
+                    currentOut = ByteBuffer.wrap(baos.getData(), 0, baos.size());
+                }else if( value.getClass() == FlowControlBuffer.class ) {
+                	outType = 3;
+                    DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+                    FlowControlBean proto = ((FlowControlBuffer)value).copy();
+                    proto.writeExternal(baos);
+                }else {
+                    throw new IOException("Unsupported type: "+value.getClass());
+                }
+                
+                //Write type:
+                target.put(outType);
+                //Write length:
+                target.putInt(currentOut.remaining());
+                if(currentOut.remaining() > 1024*1024)
+                {
+                    throw new IOException("Packet exceeded max memory size!");
+                }
+            }
+            
+            //Avoid overflow:
+            if(currentOut.remaining() > target.remaining())
+            {
+                int limit = currentOut.limit();
+                currentOut.limit(currentOut.position() + target.remaining());
+                target.put(currentOut);
+                currentOut.limit(limit);
+            }
+            else
+            {
+                target.put(currentOut);
+            }
+            
+            if(!currentOut.hasRemaining())
+            {
+                currentOut = null;
+                return true;
+            }
+            return false;
+        }
+  
+        /**
+         * Unmarshals an object. When the object is read it is returned.
+         * @param source
+         * @return The object when unmarshalled, null otherwise
+         */
+        public Object unMarshal(ByteBuffer source) throws IOException
+        {
+            if(currentIn == null)
+            {
+                if(source.remaining() < 5)
+                {
+                    return null;
+                }
+                
+                inType = source.get();
+                int length = source.getInt();
+                if(length > 1024*1024)
+                {
+                    throw new IOException("Packet exceeded max memory size!");
+                }
+                currentIn = ByteBuffer.wrap(new byte[length]);
+                
+            }
+            
+            if(!source.hasRemaining())
+            {
+            	return null;
+            }
+            
+            if(source.remaining() > currentIn.remaining())
+            {
+            	int limit = source.limit();
+            	source.limit(source.position() + currentIn.remaining());
+            	currentIn.put(source);
+            	source.limit(limit);
+            }
+            else
+            {
+            	currentIn.put(source);
+            }
+            
+            //If we haven't finished the packet return to get more data:
+            if(currentIn.hasRemaining())
+            {
+            	return null;
+            }
+            
+            Object ret = null;
+            switch(inType) {
+            case 0: {
+                DataByteArrayInputStream in = new DataByteArrayInputStream(currentIn.array());
+            	MessageBean proto = new MessageBean();
+            	proto.readExternal(in);
+            	ret = new Message(proto.freeze());
+            	break;
+            }
+            case 1: {
+            	ret = new String(currentIn.array(), "utf-8");
+            	break;
+            }
+        	case 2: {
+                DataByteArrayInputStream in = new DataByteArrayInputStream(currentIn.array());
+                DestinationBean proto = new DestinationBean();
+                proto.readExternal(in);
+        		ret = proto.freeze();
+        		break;
+        	}
+        	case 3: {
+                DataByteArrayInputStream in = new DataByteArrayInputStream(currentIn.array());
+                FlowControlBean proto = new FlowControlBean();
+                proto.readExternal(in);
+                ret = proto.freeze();
+        		break;
+        	}
+        	default:
+        		throw new IOException("Unknown type byte: " + inType);
+            }
+            
+            currentIn = null;
+            return ret;
+        }
+        
+        public int getVersion() {
+            return 0;
+        }
+        public void setVersion(int version) {
+        }
+
+        public boolean inReceive() {
+            return false;
+        }
+
+        public ByteSequence marshal(Object value) throws IOException {
+            DataByteArrayOutputStream os = new DataByteArrayOutputStream();
+            marshal(value, os);
+            return os.toByteSequence();
+        }
+        
+        public Object unmarshal(ByteSequence data) throws IOException {
+            DataByteArrayInputStream is = new DataByteArrayInputStream(data);
+            return unmarshal(is);
+        }
+    }
+
+	public WireFormat createWireFormat() {
+		return new TestWireFormat();
+	}	
+
+}

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java?rev=745938&r1=745937&r2=745938&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java Thu
Feb 19 17:30:11 2009
@@ -11,19 +11,21 @@
 import org.apache.activemq.flow.MockBroker.DeliveryTarget;
 
 public class Router {
-    final HashMap<Destination, Collection<DeliveryTarget>> lookupTable = new
HashMap<Destination, Collection<DeliveryTarget>>();
+    final HashMap<String, Collection<DeliveryTarget>> lookupTable = new HashMap<String,
Collection<DeliveryTarget>>();
 
     final synchronized void bind(DeliveryTarget dt, Destination destination) {
-        Collection<DeliveryTarget> targets = lookupTable.get(destination);
+        String key = destination.getName();
+        Collection<DeliveryTarget> targets = lookupTable.get(key);
         if (targets == null) {
             targets = new ArrayList<DeliveryTarget>();
-            lookupTable.put(destination, targets);
+            lookupTable.put(key, targets);
         }
         targets.add(dt);
     }
 
     final void route(ISourceController<Message> source, Message msg) {
-        Collection<DeliveryTarget> targets = lookupTable.get(msg.getDestination());
+        String key = msg.getDestination().getName();
+        Collection<DeliveryTarget> targets = lookupTable.get(key);
         if( targets == null ) 
             return;
         for (DeliveryTarget dt : targets) {

Added: activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2?rev=745938&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2
(added)
+++ activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2
Thu Feb 19 17:30:11 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.flow.Proto2WireFormatFactory



Mime
View raw message