activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r743990 - in /activemq/sandbox/activemq-flow: ./ src/main/proto/ src/test/java/org/apache/activemq/flow/ src/test/resources/META-INF/services/org/apache/activemq/wireformat/
Date Fri, 13 Feb 2009 05:19:24 GMT
Author: chirino
Date: Fri Feb 13 05:19:23 2009
New Revision: 743990

URL: http://svn.apache.org/viewvc?rev=743990&view=rev
Log:
Switching to a protobuf based wireformat

Added:
    activemq/sandbox/activemq-flow/src/main/proto/
    activemq/sandbox/activemq-flow/src/main/proto/test.proto
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
    activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto
Removed:
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Destination.java
Modified:
    activemq/sandbox/activemq-flow/pom.xml
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java

Modified: activemq/sandbox/activemq-flow/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/pom.xml?rev=743990&r1=743989&r2=743990&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/pom.xml (original)
+++ activemq/sandbox/activemq-flow/pom.xml Fri Feb 13 05:19:23 2009
@@ -57,4 +57,20 @@
     
   </dependencies>
 
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.activemq.protobuf</groupId>
+        <artifactId>activemq-protobuf</artifactId>
+         <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
 </project>

Added: activemq/sandbox/activemq-flow/src/main/proto/test.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/proto/test.proto?rev=743990&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/proto/test.proto (added)
+++ activemq/sandbox/activemq-flow/src/main/proto/test.proto Fri Feb 13 05:19:23 2009
@@ -0,0 +1,37 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+package org.apache.activemq.flow;
+
+option java_multiple_files = false;
+option java_outer_classname = "Commands";
+option deferred_decode = true;
+
+message Destination {
+  optional string name = 1;
+  optional bool ptp = 3;
+}
+
+message Message {
+  optional string msg = 1;
+  optional Destination dest=2;
+  optional int32 hopCount=3;
+  optional int64  msgId=4;
+  optional int32 producerId=5;
+  optional int32 priority=6;
+  repeated string property=7;
+}
+

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java?rev=743990&r1=743989&r2=743990&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
Fri Feb 13 05:19:23 2009
@@ -22,15 +22,6 @@
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
 import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
-import org.apache.activemq.flow.AbstractLimitedFlowSource;
-import org.apache.activemq.flow.Flow;
-import org.apache.activemq.flow.FlowController;
-import org.apache.activemq.flow.IFlowDrain;
-import org.apache.activemq.flow.IFlowResource;
-import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.flow.IFlowSource;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.flow.IFlowResource.FlowLifeCycleListener;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 import org.apache.activemq.queue.ExclusivePriorityQueue;

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java?rev=743990&r1=743989&r2=743990&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java
Fri Feb 13 05:19:23 2009
@@ -3,7 +3,6 @@
  */
 package org.apache.activemq.flow;
 
-import org.apache.activemq.flow.AbstractTestConnection.ReadReadyListener;
 
 class BrokerConnection extends AbstractTestConnection implements MockBroker.DeliveryTarget
{
     private final Pipe<Message> pipe;
@@ -37,7 +36,7 @@
     protected void messageReceived(Message m, ISourceController<Message> controller)
{
 
         m = new Message(m);
-        m.hopCount++;
+        m.incrementHopCount();
 
         local.router.route(controller, m);
     }
@@ -53,7 +52,7 @@
 
     public boolean match(Message message) {
         // Avoid loops:
-        if (message.hopCount > 0) {
+        if (message.getHopCount() > 0) {
             return false;
         }
 

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java?rev=743990&r1=743989&r2=743990&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java Fri
Feb 13 05:19:23 2009
@@ -17,8 +17,8 @@
 package org.apache.activemq.flow;
 
 import java.io.Serializable;
-import java.util.HashSet;
 
+import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.queue.Mapper;
 
 public class Message implements Serializable {
@@ -27,7 +27,7 @@
 
     public static final Mapper<Integer, Message> PRIORITY_MAPPER = new Mapper<Integer,
Message>() {
         public Integer map(Message element) {
-            return element.priority;
+            return element.getPriority();
         }
     };
 
@@ -40,34 +40,25 @@
     public static final short TYPE_FLOW_OPEN = 2;
     public static final short TYPE_FLOW_CLOSE = 3;
 
-    final String msg;
-    transient final Flow flow;
-    final Destination dest;
-    int hopCount;
-    HashSet<String> matchProps;
-    final long msgId;
-    final int producerId;
-    final int priority;
+    transient Flow flow;
+    private Commands.Message message = new Commands.Message();
 
     Message(long msgId, int producerId, String msg, Flow flow, Destination dest, int priority)
{
-        this.msgId = msgId;
-        this.producerId = producerId;
-        this.msg = msg;
+        this.message.setMsgId(msgId);
+        this.message.setProducerId(producerId);
+        this.message.setMsg(msg);
+        this.message.setDest(dest);
+        this.message.setPriority(priority);
         this.flow = flow;
-        this.dest = dest;
-        this.priority = priority;
-        hopCount = 0;
     }
 
     Message(Message m) {
-        this.msgId = m.msgId;
-        this.producerId = m.producerId;
-        this.msg = m.msg;
+        this.message = m.message;
         this.flow = m.flow;
-        this.dest = m.dest;
-        this.matchProps = m.matchProps;
-        this.priority = m.priority;
-        hopCount = m.hopCount;
+    }
+
+    public Message(Commands.Message m) {
+        this.message=m;
     }
 
     public short type() {
@@ -75,17 +66,16 @@
     }
 
     public void setProperty(String matchProp) {
-        if (matchProps == null) {
-            matchProps = new HashSet<String>();
-        }
-        matchProps.add(matchProp);
+        Commands.Message clone = message.clone();
+        clone.addProperty(matchProp);
+        message = clone;
     }
 
     public boolean match(String matchProp) {
-        if (matchProps == null) {
+        if (!message.hasProperty()) {
             return false;
         }
-        return matchProps.contains(matchProp);
+        return message.getPropertyList().contains(matchProp);
     }
 
     public boolean isSystem() {
@@ -93,15 +83,17 @@
     }
 
     public void incrementHopCount() {
-        hopCount++;
+        Commands.Message clone = message.clone();
+        clone.setHopCount(message.getHopCount());
+        message = clone;
     }
 
     public final int getHopCount() {
-        return hopCount;
+        return message.getHopCount();
     }
 
     public final Destination getDestination() {
-        return dest;
+        return message.getDest();
     }
 
     public Flow getFlow() {
@@ -113,18 +105,22 @@
     }
 
     public int getPriority() {
-        return priority;
+        return message.getPriority();
     }
 
     public String toString() {
-        return "Message: " + msg + " flow + " + flow + " dest: " + dest;
+        return "Message: " + message.getMsg() + " flow + " + flow + " dest: " + message.getDest();
     }
 
     public long getMsgId() {
-        return msgId;
+        return message.getMsgId();
     }
 
     public int getProducerId() {
-        return producerId;
+        return message.getProducerId();
+    }
+
+    public Commands.Message getProto() {
+        return message;
     }
 }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java?rev=743990&r1=743989&r2=743990&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
Fri Feb 13 05:19:23 2009
@@ -9,6 +9,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
 import org.apache.activemq.transport.TransportFactory;
@@ -41,7 +42,7 @@
     }
 
     public void subscribe(Destination destination, DeliveryTarget deliveryTarget) {
-        if (destination.ptp) {
+        if (destination.getPtp()) {
             queues.get(destination).addConsumer(deliveryTarget);
         } else {
             router.bind(deliveryTarget, destination);

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=743990&r1=743989&r2=743990&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
Fri Feb 13 05:19:23 2009
@@ -25,6 +25,7 @@
 
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.dispatch.PriorityPooledDispatcher;
+import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.Period;
 import org.apache.activemq.queue.Mapper;
@@ -323,8 +324,8 @@
 
         if (multibroker) {
             if( tcp ) {
-                sendBroker = createBroker("SendBroker", "tcp://localhost:10000?wireFormat=test");
-                rcvBroker = createBroker("RcvBroker", "tcp://localhost:20000?wireFormat=test");
+                sendBroker = createBroker("SendBroker", "tcp://localhost:10000?wireFormat=proto");
+                rcvBroker = createBroker("RcvBroker", "tcp://localhost:20000?wireFormat=proto");
             } else {
                 sendBroker = createBroker("SendBroker", "pipe://SendBroker");
                 rcvBroker = createBroker("RcvBroker", "pipe://RcvBroker");
@@ -333,7 +334,7 @@
             brokers.add(rcvBroker);
         } else {
             if( tcp ) {
-                sendBroker = rcvBroker = createBroker("Broker", "tcp://localhost:10000?wireFormat=test");
+                sendBroker = rcvBroker = createBroker("Broker", "tcp://localhost:10000?wireFormat=proto");
             } else {
                 sendBroker = rcvBroker = createBroker("Broker", "pipe://Broker");
             }
@@ -344,7 +345,9 @@
         Destination[] dests = new Destination[destCount];
 
         for (int i = 0; i < destCount; i++) {
-            dests[i] = new Destination("dest" + (i + 1), ptp);
+            dests[i] = new Destination();
+            dests[i].setName("dest" + (i + 1));
+            dests[i].setPtp(ptp);
             if (ptp) {
                 MockQueue queue = createQueue(sendBroker, dests[i]);
                 sendBroker.addQueue(queue);

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java?rev=743990&r1=743989&r2=743990&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java Fri
Feb 13 05:19:23 2009
@@ -5,6 +5,7 @@
 
 import java.util.HashMap;
 
+import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.flow.MockBroker.DeliveryTarget;
 import org.apache.activemq.queue.IQueue;
 import org.apache.activemq.queue.Mapper;

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java?rev=743990&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
(added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
Fri Feb 13 05:19:23 2009
@@ -0,0 +1,73 @@
+package org.apache.activemq.flow;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+public class ProtoWireFormatFactory implements WireFormatFactory {
+
+    static public class TestWireFormat implements WireFormat {
+
+        public void marshal(Object value, DataOutput out) throws IOException {
+            if( value.getClass() == Message.class ) {
+                out.writeByte(0);
+                ((Message)value).getProto().writeFramed((OutputStream)out);
+            } else if( value.getClass() == String.class ) {
+                out.writeByte(1);
+                out.writeUTF((String) value);
+            } else if( value.getClass() == Destination.class ) {
+                out.writeByte(2);
+                ((Destination)value).writeFramed((OutputStream)out);
+            } else {
+                throw new IOException("Unsupported type: "+value.getClass());
+            }
+        }
+
+        public Object unmarshal(DataInput in) throws IOException {
+            byte type = in.readByte();
+            switch(type) {
+                case 0:
+                    Commands.Message m = new Commands.Message();
+                    m.mergeFramed((InputStream)in);
+                    return new Message(m);
+                case 1:
+                    return in.readUTF();
+                case 2:
+                    Destination d = new Destination();
+                    d.mergeFramed((InputStream)in);
+                    return d;
+                default:
+                    throw new IOException("Unknonw type byte: ");
+            }
+        }
+
+        public int getVersion() {
+            return 0;
+        }
+        public void setVersion(int version) {
+        }
+
+        public boolean inReceive() {
+            return false;
+        }
+
+        public ByteSequence marshal(Object value) throws IOException {
+            return null;
+        }
+        public Object unmarshal(ByteSequence data) throws IOException {
+            return null;
+        }
+    }
+
+	public WireFormat createWireFormat() {
+		return new TestWireFormat();
+	}	
+
+}

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java?rev=743990&r1=743989&r2=743990&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
Fri Feb 13 05:19:23 2009
@@ -6,6 +6,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 import org.apache.activemq.flow.MockBroker.DeliveryTarget;
 import org.apache.activemq.queue.ExclusivePriorityQueue;

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java?rev=743990&r1=743989&r2=743990&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
Fri Feb 13 05:19:23 2009
@@ -4,6 +4,7 @@
 import java.net.URI;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.MetricCounter;
 import org.apache.activemq.transport.Transport;

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java?rev=743990&r1=743989&r2=743990&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
Fri Feb 13 05:19:23 2009
@@ -5,6 +5,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.MetricCounter;
 import org.apache.activemq.transport.Transport;

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java?rev=743990&r1=743989&r2=743990&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java Fri
Feb 13 05:19:23 2009
@@ -7,6 +7,7 @@
 import java.util.Collection;
 import java.util.HashMap;
 
+import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.flow.MockBroker.DeliveryTarget;
 
 public class Router {

Added: activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto?rev=743990&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto
(added)
+++ activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto
Fri Feb 13 05:19:23 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.flow.ProtoWireFormatFactory



Mime
View raw message