activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r745111 - in /activemq/sandbox/activemq-flow: ./ src/test/java/org/apache/activemq/flow/
Date Tue, 17 Feb 2009 14:59:46 GMT
Author: chirino
Date: Tue Feb 17 14:59:46 2009
New Revision: 745111

URL: http://svn.apache.org/viewvc?rev=745111&view=rev
Log:
Switching to the alternative protobuf api

Modified:
    activemq/sandbox/activemq-flow/pom.xml
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java

Modified: activemq/sandbox/activemq-flow/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/pom.xml?rev=745111&r1=745110&r2=745111&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/pom.xml (original)
+++ activemq/sandbox/activemq-flow/pom.xml Tue Feb 17 14:59:46 2009
@@ -62,6 +62,9 @@
       <plugin>
         <groupId>org.apache.activemq.protobuf</groupId>
         <artifactId>activemq-protobuf</artifactId>
+        <configuration>
+          <type>alt</type>
+        </configuration>
          <executions>
           <execution>
             <goals>

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java?rev=745111&r1=745110&r2=745111&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java Tue
Feb 17 14:59:46 2009
@@ -19,6 +19,8 @@
 import java.io.Serializable;
 
 import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.Commands.Message.MessageBean;
+import org.apache.activemq.flow.Commands.Message.MessageBuffer;
 import org.apache.activemq.queue.Mapper;
 
 public class Message implements Serializable {
@@ -41,14 +43,16 @@
     public static final short TYPE_FLOW_CLOSE = 3;
 
     transient Flow flow;
-    private Commands.Message message = new Commands.Message();
+    private MessageBuffer message;
 
     Message(long msgId, int producerId, String msg, Flow flow, Destination dest, int priority)
{
-        this.message.setMsgId(msgId);
-        this.message.setProducerId(producerId);
-        this.message.setMsg(msg);
-        this.message.setDest(dest);
-        this.message.setPriority(priority);
+        MessageBean message = new MessageBean();
+        message.setMsgId(msgId);
+        message.setProducerId(producerId);
+        message.setMsg(msg);
+        message.setDest(dest);
+        message.setPriority(priority);
+        this.message = message.freeze();
         this.flow = flow;
     }
 
@@ -57,7 +61,7 @@
         this.flow = m.flow;
     }
 
-    public Message(Commands.Message m) {
+    public Message(MessageBuffer m) {
         this.message=m;
     }
 
@@ -66,9 +70,7 @@
     }
 
     public void setProperty(String matchProp) {
-        Commands.Message clone = message.clone();
-        clone.addProperty(matchProp);
-        message = clone;
+        message = message.copy().addProperty(matchProp).freeze();
     }
 
     public boolean match(String matchProp) {
@@ -83,9 +85,7 @@
     }
 
     public void incrementHopCount() {
-        Commands.Message clone = message.clone();
-        clone.setHopCount(message.getHopCount());
-        message = clone;
+        message = message.copy().setHopCount(message.getHopCount()).freeze();
     }
 
     public final int getHopCount() {
@@ -120,7 +120,7 @@
         return message.getProducerId();
     }
 
-    public Commands.Message getProto() {
+    public MessageBuffer getProto() {
         return message;
     }
 }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=745111&r1=745110&r2=745111&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
Tue Feb 17 14:59:46 2009
@@ -26,6 +26,8 @@
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.dispatch.PriorityPooledDispatcher;
 import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.Commands.Destination.DestinationBean;
+import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.Period;
 import org.apache.activemq.queue.Mapper;
@@ -49,7 +51,7 @@
     boolean ptp = false;
 
     // Set to use tcp IO
-    boolean tcp = false;
+    boolean tcp = true;
 
     // Set's the number of threads to use:
     private final int asyncThreadPoolSize = Runtime.getRuntime().availableProcessors();
@@ -339,12 +341,13 @@
             brokers.add(sendBroker);
         }
 
-        Destination[] dests = new Destination[destCount];
+        DestinationBuffer[] dests = new DestinationBuffer[destCount];
 
         for (int i = 0; i < destCount; i++) {
-            dests[i] = new Destination();
-            dests[i].setName("dest" + (i + 1));
-            dests[i].setPtp(ptp);
+            DestinationBean bean = new DestinationBean();
+            bean.setName("dest" + (i + 1));
+            bean.setPtp(ptp);
+            dests[i] = bean.freeze();
             if (ptp) {
                 MockQueue queue = createQueue(sendBroker, dests[i]);
                 sendBroker.addQueue(queue);

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java?rev=745111&r1=745110&r2=745111&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
Tue Feb 17 14:59:46 2009
@@ -6,8 +6,9 @@
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 
-import org.apache.activemq.flow.Commands.Destination;
-import org.apache.activemq.flow.Commands.FlowControl;
+import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
+import org.apache.activemq.flow.Commands.Message.MessageBuffer;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOExceptionSupport;
@@ -27,7 +28,7 @@
         public void marshal(Object value, DataOutput out) throws IOException {
             if( value.getClass() == Message.class ) {
                 out.writeByte(0);
-                Commands.Message proto = ((Message)value).getProto();
+                MessageBuffer proto = ((Message)value).getProto();
                 Buffer buffer = proto.toUnframedBuffer();
                 out.writeInt(buffer.getLength());
                 out.write(buffer.getData(), buffer.getOffset(), buffer.getLength());
@@ -37,15 +38,15 @@
                 byte[] bytes = value2.getBytes("UTF-8");
                 out.writeInt(bytes.length);
                 out.write(bytes);
-            } else if( value.getClass() == Destination.class ) {
+            } else if( value.getClass() == DestinationBuffer.class ) {
                 out.writeByte(2);
-                Destination proto = (Destination)value;
+                DestinationBuffer proto = (DestinationBuffer)value;
                 Buffer buffer = proto.toUnframedBuffer();
                 out.writeInt(buffer.getLength());
                 out.write(buffer.getData(), buffer.getOffset(), buffer.getLength());
-            }else if( value.getClass() == FlowControl.class ) {
+            }else if( value.getClass() == FlowControlBuffer.class ) {
                 out.writeByte(3);
-                FlowControl proto = (FlowControl)value;
+                FlowControlBuffer proto = (FlowControlBuffer)value;
                 Buffer buffer = proto.toUnframedBuffer();
                 out.writeInt(buffer.getLength());
                 out.write(buffer.getData(), buffer.getOffset(), buffer.getLength());
@@ -61,18 +62,15 @@
             in.readFully(data);
             switch(type) {
                 case 0:
-                    Commands.Message m = new Commands.Message();
-                    m.mergeUnframed(data);
+                    MessageBuffer m = MessageBuffer.parseUnframed(data);
                     return new Message(m);
                 case 1:
                     return new String(data, "UTF-8");
                 case 2:
-                    Destination d = new Destination();
-                    d.mergeUnframed(data);
+                    DestinationBuffer d = DestinationBuffer.parseUnframed(data);
                     return d;
                 case 3:
-                    FlowControl fc = new FlowControl();
-                    fc.mergeUnframed(data);
+                    FlowControlBuffer fc = FlowControlBuffer.parseUnframed(data);
                     return fc;
                 default:
                     throw new IOException("Unknonw type byte: ");
@@ -101,12 +99,12 @@
                         //Shouldn't happen.
                         throw IOExceptionSupport.create(e);
                     }
-                } else if( value.getClass() == Destination.class ) {
+                } else if( value.getClass() == DestinationBuffer.class ) {
                 	outType = 2;
-                    currentOut = ByteBuffer.wrap(((Destination)value).toUnframedByteArray());
-                }else if( value.getClass() == FlowControl.class ) {
+                    currentOut = ByteBuffer.wrap(((DestinationBuffer)value).toUnframedByteArray());
+                }else if( value.getClass() == FlowControlBuffer.class ) {
                 	outType = 3;
-                    currentOut = ByteBuffer.wrap(((FlowControl)value).toUnframedByteArray());
+                    currentOut = ByteBuffer.wrap(((FlowControlBuffer)value).toUnframedByteArray());
                 }else {
                     throw new IOException("Unsupported type: "+value.getClass());
                 }
@@ -192,28 +190,18 @@
             Object ret = null;
             switch(inType) {
             case 0:
-            	Commands.Message m = new Commands.Message();
-            	try
-            	{
-            		m.mergeUnframed(currentIn.array());
-            	}
-            	catch(Exception e)
-            	{
-            		e.printStackTrace();
-            	}
+            	MessageBuffer m = MessageBuffer.parseUnframed(currentIn.array());
             	ret = new Message(m);
             	break;
             case 1:
             	ret = new String(currentIn.array(), "utf-8");
             	break;
         	case 2:
-        		Destination d = new Destination();
-        		d.mergeUnframed(currentIn.array());
+        		DestinationBuffer d = DestinationBuffer.parseUnframed(currentIn.array());
         		ret = d;
         		break;
         	case 3:
-        		FlowControl c = new FlowControl();
-        		c.mergeUnframed(currentIn.array());
+        		FlowControlBuffer c = FlowControlBuffer.parseUnframed(currentIn.array());
         		ret = c;
         		break;
         	default:

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java?rev=745111&r1=745110&r2=745111&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
Tue Feb 17 14:59:46 2009
@@ -8,6 +8,9 @@
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.flow.Commands.FlowControl;
+import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBean;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 import org.apache.activemq.flow.MockBroker.DeliveryTarget;
 import org.apache.activemq.queue.SingleFlowRelay;
@@ -77,12 +80,12 @@
             } else if (command.getClass() == Message.class) {
                 Message msg = (Message) command;
                 inboundController.add(msg, null);
-            } else if (command.getClass() == Destination.class) {
+            } else if (command.getClass() == DestinationBuffer.class) {
                 // This is a subscription request
                 Destination destination = (Destination) command;
 
                 broker.subscribe(destination, this);
-            } else if (command.getClass() == FlowControl.class) {
+            } else if (command.getClass() == FlowControlBuffer.class) {
                 // This is a subscription request
                 FlowControl fc = (FlowControl) command;
                 synchronized (outputQueue) {
@@ -306,9 +309,9 @@
             if (!clientMode) {
                 available += size;
                 if (available >= capacity - resumeThreshold) {
-                    FlowControl fc = new FlowControl();
+                    FlowControlBean fc = new FlowControlBean();
                     fc.setCredit(available);
-                    write(fc);
+                    write(fc.freeze());
                     // System.out.println(RemoteConnection.this.name +
                     // " Send Release " + available + this);
                     available = 0;



Mime
View raw message