activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r704995 [3/3] - in /activemq/sandbox/chirino-pb/activemq-core: ./ src/main/java/org/apache/activemq/advisory/ src/main/java/org/apache/activemq/broker/ src/main/java/org/apache/activemq/command/ src/main/java/org/apache/activemq/pbwire/ src...
Date Wed, 15 Oct 2008 19:12:28 GMT
Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/SessionId.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/SessionId.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/SessionId.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/SessionId.java Wed Oct 15 12:12:26 2008
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.command;
 
+
+
 /**
  * 
  * @openwire:marshaller code="121"
@@ -25,8 +27,7 @@
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SESSION_ID;
 
-    protected String connectionId;
-    protected long value;
+    protected PBSessionId pb = new PBSessionId(); 
 
     protected transient int hashCode;
     protected transient String key;
@@ -36,23 +37,24 @@
     }
 
     public SessionId(ConnectionId connectionId, long sessionId) {
-        this.connectionId = connectionId.getValue();
-        this.value = sessionId;
+        this.pb.setConnectionId(connectionId.getValue());
+        this.pb.setId(sessionId);
     }
 
     public SessionId(SessionId id) {
-        this.connectionId = id.getConnectionId();
-        this.value = id.getValue();
+        this.pb = id.pb.clone();
     }
 
     public SessionId(ProducerId id) {
-        this.connectionId = id.getConnectionId();
-        this.value = id.getSessionId();
+        this.pb = id.getPB().getSessionId();
     }
 
     public SessionId(ConsumerId id) {
-        this.connectionId = id.getConnectionId();
-        this.value = id.getSessionId();
+        this.pb = id.getPB().getSessionId();
+    }
+
+    public SessionId(PBSessionId sessionId) {
+        pb = sessionId;
     }
 
     public ConnectionId getParentId() {
@@ -64,7 +66,7 @@
 
     public int hashCode() {
         if (hashCode == 0) {
-            hashCode = connectionId.hashCode() ^ (int)value;
+            hashCode = this.pb.hashCode();
         }
         return hashCode;
     }
@@ -76,8 +78,8 @@
         if (o == null || o.getClass() != SessionId.class) {
             return false;
         }
-        SessionId id = (SessionId)o;
-        return value == id.value && connectionId.equals(id.connectionId);
+        
+        return this.pb.equals(((SessionId)o).pb);
     }
 
     public byte getDataStructureType() {
@@ -88,32 +90,33 @@
      * @openwire:property version=1 cache=true
      */
     public String getConnectionId() {
-        return connectionId;
+        return pb.getConnectionId();
     }
 
     public void setConnectionId(String connectionId) {
-        this.connectionId = connectionId;
+        this.pb.setConnectionId(connectionId);
     }
 
     /**
      * @openwire:property version=1
      */
     public long getValue() {
-        return value;
+        return pb.getId();
     }
 
     public void setValue(long sessionId) {
-        this.value = sessionId;
+        this.pb.setId(sessionId);
     }
 
     public String toString() {
-        if (key == null) {
-            key = connectionId + ":" + value;
-        }
-        return key;
+        return pb.toString();
     }
 
     public boolean isMarshallAware() {
         return false;
     }
+
+	public PBSessionId getPB() {
+		return pb;
+	}
 }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/SessionInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/SessionInfo.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/SessionInfo.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/SessionInfo.java Wed Oct 15 12:12:26 2008
@@ -27,18 +27,21 @@
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SESSION_INFO;
 
-    protected SessionId sessionId;
+    protected PBSessionInfo pb = new PBSessionInfo();
 
     public SessionInfo() {
-        sessionId = new SessionId();
     }
 
     public SessionInfo(ConnectionInfo connectionInfo, long sessionId) {
-        this.sessionId = new SessionId(connectionInfo.getConnectionId(), sessionId);
+        setSessionId(new SessionId(connectionInfo.getConnectionId(), sessionId));
     }
 
     public SessionInfo(SessionId sessionId) {
-        this.sessionId = sessionId;
+        setSessionId(sessionId);
+    }
+
+    public SessionInfo(PBSessionInfo pb) {
+        this.pb=pb;
     }
 
     public byte getDataStructureType() {
@@ -49,11 +52,18 @@
      * @openwire:property version=1 cache=true
      */
     public SessionId getSessionId() {
-        return sessionId;
+        if( pb.hasSessionId() ) {
+            return new SessionId(pb.getSessionId());
+        }
+        return null;
     }
 
     public void setSessionId(SessionId sessionId) {
-        this.sessionId = sessionId;
+        if( sessionId ==null ) {
+            pb.clearSessionId();
+        } else {
+            this.pb.setSessionId(sessionId.getPB());
+        }
     }
 
     public RemoveInfo createRemoveCommand() {
@@ -66,4 +76,8 @@
         return visitor.processAddSession(this);
     }
 
+    public PBSessionInfo getPBCommand() {
+        return pb;
+    }
+
 }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ShutdownInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ShutdownInfo.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ShutdownInfo.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/ShutdownInfo.java Wed Oct 15 12:12:26 2008
@@ -26,6 +26,15 @@
 public class ShutdownInfo extends BaseCommand {
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SHUTDOWN_INFO;
+    protected final PBShutdownInfo pb;
+    
+    public ShutdownInfo() {
+        this(new PBShutdownInfo());
+    }
+    
+    public ShutdownInfo(PBShutdownInfo pb) {
+        this.pb=pb;
+    }
 
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
@@ -39,4 +48,8 @@
         return true;
     }
 
+    public PBShutdownInfo getPBCommand() {
+        return pb;
+    }
+
 }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/TransactionId.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/TransactionId.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/TransactionId.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/TransactionId.java Wed Oct 15 12:12:26 2008
@@ -29,4 +29,7 @@
     public boolean isMarshallAware() {
         return false;
     }
+    
+    abstract public PBTransactionId getPB();
+
 }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/TransactionInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/TransactionInfo.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/TransactionInfo.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/TransactionInfo.java Wed Oct 15 12:12:26 2008
@@ -18,6 +18,8 @@
 
 import java.io.IOException;
 
+import org.apache.activemq.command.PBTransactionInfo.PBTransactionType;
+import org.apache.activemq.pbwire.PBConversionSupport;
 import org.apache.activemq.state.CommandVisitor;
 
 /**
@@ -38,17 +40,19 @@
     public static final byte FORGET = 6;
     public static final byte END = 7;
 
-    protected byte type;
-    protected ConnectionId connectionId;
-    protected TransactionId transactionId;
+    protected PBTransactionInfo pb = new PBTransactionInfo();
 
     public TransactionInfo() {
     }
 
     public TransactionInfo(ConnectionId connectionId, TransactionId transactionId, byte type) {
-        this.connectionId = connectionId;
-        this.transactionId = transactionId;
-        this.type = type;
+        setConnectionId(connectionId);
+        setTransactionId(transactionId);
+        setType(type);
+    }
+
+    public TransactionInfo(PBTransactionInfo pb) {
+        this.pb=pb;
     }
 
     public byte getDataStructureType() {
@@ -59,56 +63,74 @@
      * @openwire:property version=1 cache=true
      */
     public ConnectionId getConnectionId() {
-        return connectionId;
+        if( pb.hasConnectionId() ) {
+            return new ConnectionId(pb.getConnectionId());
+        }
+        return null;
     }
 
     public void setConnectionId(ConnectionId connectionId) {
-        this.connectionId = connectionId;
+        if( connectionId==null ) {
+            pb.clearConnectionId();
+        } else {
+            this.pb.setConnectionId(connectionId.getValue());
+        }
     }
 
     /**
      * @openwire:property version=1 cache=true
      */
     public TransactionId getTransactionId() {
-        return transactionId;
+        if( pb.hasTransactionId() ) {
+            return PBConversionSupport.convert(pb.getTransactionId());
+        }
+        return null;
     }
 
     public void setTransactionId(TransactionId transactionId) {
-        this.transactionId = transactionId;
+        if( transactionId==null ) {
+            pb.clearTransactionId();
+        } else {
+            this.pb.setTransactionId(PBConversionSupport.convert(transactionId));
+        }
     }
 
     /**
      * @openwire:property version=1
      */
     public byte getType() {
-        return type;
+        return (byte) pb.getType().getNumber();
     }
 
     public void setType(byte type) {
-        this.type = type;
+        this.pb.setType(PBTransactionType.valueOf(type));
     }
 
     public Response visit(CommandVisitor visitor) throws Exception {
-        switch (type) {
-        case TransactionInfo.BEGIN:
+        switch (pb.getType()) {
+        case BEGIN:
             return visitor.processBeginTransaction(this);
-        case TransactionInfo.END:
+        case END:
             return visitor.processEndTransaction(this);
-        case TransactionInfo.PREPARE:
+        case PREPARE:
             return visitor.processPrepareTransaction(this);
-        case TransactionInfo.COMMIT_ONE_PHASE:
+        case COMMIT_ONE_PHASE:
             return visitor.processCommitTransactionOnePhase(this);
-        case TransactionInfo.COMMIT_TWO_PHASE:
+        case COMMIT_TWO_PHASE:
             return visitor.processCommitTransactionTwoPhase(this);
-        case TransactionInfo.ROLLBACK:
+        case ROLLBACK:
             return visitor.processRollbackTransaction(this);
-        case TransactionInfo.RECOVER:
+        case RECOVER:
             return visitor.processRecoverTransactions(this);
-        case TransactionInfo.FORGET:
+        case FORGET:
             return visitor.processForgetTransaction(this);
         default:
-            throw new IOException("Transaction info type unknown: " + type);
+            throw new IOException("Transaction info type unknown: " + pb.getType());
         }
     }
 
+    public PBTransactionInfo getPBCommand() {
+        return pb;
+    }
+
 }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java Wed Oct 15 12:12:26 2008
@@ -24,6 +24,8 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.activemq.pbwire.PBConversionSupport;
+import org.apache.activemq.pbwire.PBWireFormat;
 import org.apache.activemq.state.CommandVisitor;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
@@ -42,9 +44,10 @@
     private static final byte MAGIC[] = new byte[] {'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q'};
 
     protected byte magic[] = MAGIC;
-    protected int version;
-    protected ByteSequence marshalledProperties;
 
+    protected PBWireFormatInfo pb = new PBWireFormatInfo();
+    
+    protected ByteSequence marshalledProperties;
     protected transient Map<String, Object> properties;
     private transient Endpoint from;
     private transient Endpoint to;
@@ -76,11 +79,11 @@
      * @openwire:property version=1
      */
     public int getVersion() {
-        return version;
+        return pb.getVersion();
     }
 
     public void setVersion(int version) {
-        this.version = version;
+        this.pb.setVersion(version);
     }
 
     /**
@@ -125,10 +128,13 @@
 
     public Object getProperty(String name) throws IOException {
         if (properties == null) {
-            if (marshalledProperties == null) {
-                return null;
+            if (marshalledProperties != null) {
+                properties = unmarsallProperties(marshalledProperties);
+            } else if( pb.hasProperty() ) {
+                properties = PBConversionSupport.convertMap(pb.getPropertyList());
+            } else {
+                properties = new HashMap<String, Object>();
             }
-            properties = unmarsallProperties(marshalledProperties);
         }
         return properties.get(name);
     }
@@ -136,10 +142,13 @@
     @SuppressWarnings("unchecked")
     public Map<String, Object> getProperties() throws IOException {
         if (properties == null) {
-            if (marshalledProperties == null) {
-                return Collections.EMPTY_MAP;
+            if (marshalledProperties != null) {
+                properties = unmarsallProperties(marshalledProperties);
+            } else if( pb.hasProperty() ) {
+                properties = PBConversionSupport.convertMap(pb.getPropertyList());
+            } else {
+                properties = new HashMap<String, Object>();
             }
-            properties = unmarsallProperties(marshalledProperties);
         }
         return Collections.unmodifiableMap(properties);
     }
@@ -150,17 +159,20 @@
     }
 
     public void setProperty(String name, Object value) throws IOException {
-        lazyCreateProperties();
+        loadAndClear();
         properties.put(name, value);
     }
 
-    protected void lazyCreateProperties() throws IOException {
+    protected void loadAndClear() throws IOException {
         if (properties == null) {
-            if (marshalledProperties == null) {
-                properties = new HashMap<String, Object>();
-            } else {
+            if (marshalledProperties != null) {
                 properties = unmarsallProperties(marshalledProperties);
                 marshalledProperties = null;
+            } else if( pb.hasProperty() ) {
+                properties = PBConversionSupport.convertMap(pb.getPropertyList());
+                pb.clearProperty();
+            } else {
+                properties = new HashMap<String, Object>();
             }
         }
     }
@@ -171,12 +183,16 @@
 
     public void beforeMarshall(WireFormat wireFormat) throws IOException {
         // Need to marshal the properties.
-        if (marshalledProperties == null && properties != null) {
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            DataOutputStream os = new DataOutputStream(baos);
-            MarshallingSupport.marshalPrimitiveMap(properties, os);
-            os.close();
-            marshalledProperties = baos.toByteSequence();
+        if( properties!=null ) {
+            if( wireFormat.getClass() == PBWireFormat.class && !pb.hasProperty() ) {
+                pb.setPropertyList(PBConversionSupport.convert(properties));
+            } else if (marshalledProperties == null) {
+                ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputStream os = new DataOutputStream(baos);
+                MarshallingSupport.marshalPrimitiveMap(properties, os);
+                os.close();
+                marshalledProperties = baos.toByteSequence();
+            }
         }
     }
 
@@ -296,7 +312,7 @@
             p = getProperties();
         } catch (IOException ignore) {
         }
-        return "WireFormatInfo { version=" + version + ", properties=" + p + ", magic=" + toString(magic) + "}";
+        return "WireFormatInfo { version=" + getVersion() + ", properties=" + p + ", magic=" + toString(magic) + "}";
     }
 
     private String toString(byte[] data) {
@@ -364,4 +380,12 @@
         return null;
     }
 
+    public PBWireFormatInfo getPBCommand() {
+        return pb;
+    }
+
+    public Command getCommandObject() {
+        return null;
+    }
+
 }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/XATransactionId.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/XATransactionId.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/XATransactionId.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/command/XATransactionId.java Wed Oct 15 12:12:26 2008
@@ -16,10 +16,12 @@
  */
 package org.apache.activemq.command;
 
-import java.util.Arrays;
 import javax.transaction.xa.Xid;
+
 import org.apache.activemq.util.HexSupport;
 
+import com.google.protobuf.ByteString;
+
 /**
  * @openwire:marshaller code="112"
  * @version $Revision: 1.6 $
@@ -28,9 +30,7 @@
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_XA_TRANSACTION_ID;
 
-    private int formatId;
-    private byte[] branchQualifier;
-    private byte[] globalTransactionId;
+    protected PBTransactionId pb = new PBTransactionId();
 
     private transient int hash;
     private transient String transactionKey;
@@ -39,9 +39,13 @@
     }
 
     public XATransactionId(Xid xid) {
-        this.formatId = xid.getFormatId();
-        this.globalTransactionId = xid.getGlobalTransactionId();
-        this.branchQualifier = xid.getBranchQualifier();
+        setFormatId(xid.getFormatId());
+        setGlobalTransactionId(xid.getGlobalTransactionId());
+        setBranchQualifier(xid.getBranchQualifier());
+    }
+
+    public XATransactionId(PBTransactionId pb) {
+        this.pb = pb;
     }
 
     public byte getDataStructureType() {
@@ -50,8 +54,8 @@
 
     public synchronized String getTransactionKey() {
         if (transactionKey == null) {
-            transactionKey = "XID:" + formatId + ":" + HexSupport.toHexFromBytes(globalTransactionId) + ":"
-                             + HexSupport.toHexFromBytes(branchQualifier);
+            transactionKey = "XID:" + getFormatId() + ":" + HexSupport.toHexFromBytes(getGlobalTransactionId()) + ":"
+                             + HexSupport.toHexFromBytes(getBranchQualifier());
         }
         return transactionKey;
     }
@@ -72,54 +76,41 @@
      * @openwire:property version=1
      */
     public int getFormatId() {
-        return formatId;
+        return pb.getXaTransactionId().getFormatId();
     }
 
     /**
      * @openwire:property version=1
      */
     public byte[] getGlobalTransactionId() {
-        return globalTransactionId;
+        return pb.getXaTransactionId().getGlobalTransactionId().toByteArray();
     }
 
     /**
      * @openwire:property version=1
      */
     public byte[] getBranchQualifier() {
-        return branchQualifier;
+        return pb.getXaTransactionId().getBranchQualifier().toByteArray();
     }
 
     public void setBranchQualifier(byte[] branchQualifier) {
-        this.branchQualifier = branchQualifier;
+        this.pb.getXaTransactionId().setBranchQualifier(ByteString.copyFrom(branchQualifier));
         this.hash = 0;
     }
 
     public void setFormatId(int formatId) {
-        this.formatId = formatId;
+        this.pb.getXaTransactionId().setFormatId(formatId);
         this.hash = 0;
     }
 
     public void setGlobalTransactionId(byte[] globalTransactionId) {
-        this.globalTransactionId = globalTransactionId;
+        this.pb.getXaTransactionId().setGlobalTransactionId(ByteString.copyFrom(globalTransactionId));
         this.hash = 0;
     }
 
     public int hashCode() {
         if (hash == 0) {
-            hash = formatId;
-            hash = hash(globalTransactionId, hash);
-            hash = hash(branchQualifier, hash);
-            if (hash == 0) {
-                hash = 0xaceace;
-            }
-        }
-        return hash;
-    }
-
-    private static int hash(byte[] bytes, int hash) {
-        int size = bytes.length;
-        for (int i = 0; i < size; i++) {
-            hash ^= bytes[i] << ((i % 4) * 8);
+            hash = pb.hashCode();
         }
         return hash;
     }
@@ -129,8 +120,7 @@
             return false;
         }
         XATransactionId xid = (XATransactionId)o;
-        return xid.formatId == formatId && Arrays.equals(xid.globalTransactionId, globalTransactionId)
-               && Arrays.equals(xid.branchQualifier, branchQualifier);
+        return xid.pb.equals(pb);
     }
 
     public int compareTo(Object o) {
@@ -141,4 +131,9 @@
         return getTransactionKey().compareTo(xid.getTransactionKey());
     }
 
+    @Override
+    public PBTransactionId getPB() {
+        return pb;
+    }
+
 }

Added: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBCommand.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBCommand.java?rev=704995&view=auto
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBCommand.java (added)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBCommand.java Wed Oct 15 12:12:26 2008
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.pbwire;
+
+import org.apache.activemq.command.PBCommandType;
+import org.apache.activemq.protobuf.Message;
+
+public interface PBCommand<T> extends Message<T> {
+    
+    public PBCommandType type();
+    
+    public Object visit(PBCommandVisitor visitor) throws java.io.IOException;
+
+}

Added: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBCommandVisitor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBCommandVisitor.java?rev=704995&view=auto
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBCommandVisitor.java (added)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBCommandVisitor.java Wed Oct 15 12:12:26 2008
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.pbwire;
+
+import org.apache.activemq.command.PBBrokerInfo;
+import org.apache.activemq.command.PBConnectionControl;
+import org.apache.activemq.command.PBConnectionError;
+import org.apache.activemq.command.PBConnectionInfo;
+import org.apache.activemq.command.PBConsumerControl;
+import org.apache.activemq.command.PBConsumerInfo;
+import org.apache.activemq.command.PBControlCommand;
+import org.apache.activemq.command.PBDestinationInfo;
+import org.apache.activemq.command.PBFlushCommand;
+import org.apache.activemq.command.PBJournalQueueAck;
+import org.apache.activemq.command.PBJournalTopicAck;
+import org.apache.activemq.command.PBJournalTrace;
+import org.apache.activemq.command.PBJournalTransaction;
+import org.apache.activemq.command.PBKeepAliveInfo;
+import org.apache.activemq.command.PBMessage;
+import org.apache.activemq.command.PBMessageAck;
+import org.apache.activemq.command.PBMessageDispatch;
+import org.apache.activemq.command.PBMessageDispatchNotification;
+import org.apache.activemq.command.PBMessagePull;
+import org.apache.activemq.command.PBPartialCommand;
+import org.apache.activemq.command.PBProducerAck;
+import org.apache.activemq.command.PBProducerInfo;
+import org.apache.activemq.command.PBRemoveInfo;
+import org.apache.activemq.command.PBRemoveSubscriptionInfo;
+import org.apache.activemq.command.PBReplayCommand;
+import org.apache.activemq.command.PBResponse;
+import org.apache.activemq.command.PBSessionInfo;
+import org.apache.activemq.command.PBShutdownInfo;
+import org.apache.activemq.command.PBTransactionInfo;
+import org.apache.activemq.command.PBWireFormatInfo;
+
+public class PBCommandVisitor {
+
+    public Object visit(PBMessage pb) {
+        return null;
+    }
+
+    public Object visit(PBBrokerInfo pb) {
+        return null;
+    }
+
+    public Object visit(PBConnectionControl pb) {
+        return null;
+    }
+
+    public Object visit(PBConnectionError pb) {
+        return null;
+    }
+
+    public Object visit(PBConnectionInfo pb) {
+        return null;
+    }
+
+    public Object visit(PBConsumerControl pb) {
+        return null;
+    }
+
+    public Object visit(PBConsumerInfo pb) {
+        return null;
+    }
+
+    public Object visit(PBControlCommand pb) {
+        return null;
+    }
+
+    public Object visit(PBDestinationInfo pb) {
+        return null;
+    }
+
+    public Object visit(PBFlushCommand pb) {
+        return null;
+    }
+
+    public Object visit(PBKeepAliveInfo pb) {
+        return null;
+    }
+
+    public Object visit(PBMessageAck pb) {
+        return null;
+    }
+
+    public Object visit(PBMessagePull pb) {
+        return null;
+    }
+
+    public Object visit(PBPartialCommand pb) {
+        return null;
+    }
+
+    public Object visit(PBProducerAck pb) {
+        return null;
+    }
+
+    public Object visit(PBProducerInfo pb) {
+        return null;
+    }
+
+    public Object visit(PBRemoveInfo pb) {
+        return null;
+    }
+
+    public Object visit(PBRemoveSubscriptionInfo pb) {
+        return null;
+    }
+
+    public Object visit(PBReplayCommand pb) {
+        return null;
+    }
+
+    public Object visit(PBResponse pb) {
+        return null;
+    }
+
+    public Object visit(PBSessionInfo pb) {
+        return null;
+    }
+
+    public Object visit(PBShutdownInfo pb) {
+        return null;
+    }
+
+    public Object visit(PBTransactionInfo pb) {
+        return null;
+    }
+
+    public Object visit(PBWireFormatInfo pb) {
+        return null;
+    }
+
+    public Object visit(PBMessageDispatchNotification pb) {
+        return null;
+    }
+
+    public Object visit(PBMessageDispatch pb) {
+        return null;
+    }
+
+    public Object visit(PBJournalQueueAck pb) {
+        return null;
+    }
+
+    public Object visit(PBJournalTopicAck pb) {
+        return null;
+    }
+
+    public Object visit(PBJournalTrace pb) {
+        return null;
+    }
+
+    public Object visit(PBJournalTransaction pb) {
+        return null;
+    }
+
+}

Added: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBConversionSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBConversionSupport.java?rev=704995&view=auto
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBConversionSupport.java (added)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBConversionSupport.java Wed Oct 15 12:12:26 2008
@@ -0,0 +1,318 @@
+package org.apache.activemq.pbwire;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.PBActiveMQDestination;
+import org.apache.activemq.command.PBKeyValue;
+import org.apache.activemq.command.PBMessage;
+import org.apache.activemq.command.PBStackTraceElement;
+import org.apache.activemq.command.PBThrowable;
+import org.apache.activemq.command.PBTransactionId;
+import org.apache.activemq.command.PBValue;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.openwire.v1.BaseDataStreamMarshaller;
+import org.apache.commons.collections.map.HashedMap;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class PBConversionSupport {
+
+    public static PBThrowable convert(Throwable e, boolean includeStackTrace) {
+        if( e == null ) {
+            return null;
+        }
+        PBThrowable rc = new PBThrowable();
+        rc.setClassName(e.getClass().getName());
+        if( e.getMessage()!=null ) {
+            rc.setMessage(e.getMessage());
+        }
+        
+        if (includeStackTrace) {
+            StackTraceElement[] stackTrace = e.getStackTrace();
+            for (int i = 0; i < stackTrace.length; i++) {
+                rc.addStackFrames(convert(stackTrace[i]));
+            }
+        }
+
+        if( e.getCause()!=null ) {
+            rc.setCause( convert(e.getCause(), includeStackTrace) );
+        }
+        
+        return rc;
+    }
+
+    public static PBStackTraceElement convert(StackTraceElement ste) {
+        if( ste == null ) {
+            return null;
+        }
+        PBStackTraceElement rc = new PBStackTraceElement();
+        rc.setClassName(ste.getClassName()); 
+        rc.setMethodName(ste.getMethodName());
+        rc.setFileName(ste.getFileName());
+        rc.setLineNumber(ste.getLineNumber());
+        return rc;
+    }
+
+    static private Throwable createThrowable(String className, String message) {
+        try {
+            Class clazz = Class.forName(className, false, BaseDataStreamMarshaller.class.getClassLoader());
+            Constructor constructor = clazz.getConstructor(new Class[] {String.class});
+            return (Throwable)constructor.newInstance(new Object[] {message});
+        } catch (Throwable e) {
+            return new Throwable(className + ": " + message);
+        }
+    }
+
+    public static Throwable convert(PBThrowable pb, boolean includeStackTrace) {
+        if( pb == null ) {
+            return null;
+        }
+        Throwable rc = createThrowable(pb.getClassName(), pb.getMessage());
+
+        if( includeStackTrace && pb.hasStackFrames() ) {
+            StackTraceElement ss[] = new StackTraceElement[pb.getStackFramesCount()];
+            for (int i = 0; i < ss.length; i++) {
+                ss[i] = convert(pb.getStackFrames(i));
+            }
+            rc.setStackTrace(ss);
+        }
+        
+        if( pb.hasCause() ) {
+            rc.initCause(convert(pb.getCause(), includeStackTrace));
+        }
+        return rc;
+    }
+
+    public static StackTraceElement convert(PBStackTraceElement ste) {
+        StackTraceElement rc = new StackTraceElement(ste.getClassName(),
+                ste.getMethodName(), ste.getFileName(), ste.getLineNumber());
+        return rc;
+    }
+
+    public static ActiveMQDestination convert(PBActiveMQDestination destination) {
+        if( destination==null ) {
+            return null;
+        }
+        switch( destination.getType() ) {
+        case QUEUE:
+            return new ActiveMQQueue(destination);
+        case TOPIC:
+            return new ActiveMQTopic(destination);
+        case TEMP_QUEUE:
+            return new ActiveMQTempQueue(destination);
+        case TEMP_TOPIC:
+            return new ActiveMQTempTopic(destination);
+        }
+        throw new RuntimeException("Unknown destination type: "+destination.getType());
+    }
+
+    public static BrokerId[] convertBrokerIdList(List<String> brokers) {
+        BrokerId[] rc = new BrokerId[brokers.size()];
+        for (int i = 0; i < rc.length; i++) {
+            rc[i] = new BrokerId(brokers.get(i));
+        }
+        return rc;
+    }
+    
+    public static ArrayList<String> convertBrokerIdList(BrokerId[] brokerPath) {
+        ArrayList<String> rc = new ArrayList<String>(brokerPath.length);
+        for (int i = 0; i < brokerPath.length; i++) {
+            rc.add(brokerPath[i].getValue());
+        }
+        return rc;
+    }
+
+    public static TransactionId convert(PBTransactionId transactionId) {
+        if( transactionId == null ) {
+            return null;
+        }
+        if( transactionId.hasLocalTransactionId() ) {
+            return new LocalTransactionId(transactionId);
+        }
+        if( transactionId.hasXaTransactionId() ) {
+            return new XATransactionId(transactionId);
+        }
+        return null;
+    }
+
+    public static PBTransactionId convert(TransactionId transactionId) {
+        return transactionId.getPB();
+    }
+
+    public static List<PBKeyValue> convert(Map<String, Object> properties) {
+        if( properties ==null ) {
+            return null;
+        }
+        ArrayList<PBKeyValue> rc = new ArrayList<PBKeyValue>(properties.size()); 
+        for (Entry<String, Object> entry : properties.entrySet()) {
+            PBKeyValue kv = new PBKeyValue();
+            kv.setName(entry.getKey());
+            kv.setValue(convertValue(entry.getValue()));
+            rc.add(kv);
+        }
+        return rc;
+    }
+
+    public static List<PBValue> convert(List<Object> list) {
+        if( list ==null ) {
+            return null;
+        }
+        ArrayList<PBValue> rc = new ArrayList<PBValue>(list.size()); 
+        for (Object entry : list) {
+            rc.add(convertValue(entry));
+        }
+        return rc;
+    }
+    
+    private static PBValue convertValue(Object value) {
+        PBValue rc = new PBValue();
+        if (value == null) {
+            rc.setNullValue(true);
+        } else if (value.getClass() == Boolean.class) {
+            rc.setBooleanValue((Boolean)value);
+        } else if (value.getClass() == Byte.class) {
+            rc.setByteValue((Byte)value);
+        } else if (value.getClass() == Character.class) {
+            rc.setCharValue((Character)value);
+        } else if (value.getClass() == Short.class) {
+            rc.setShortValue((Short)value);
+        } else if (value.getClass() == Integer.class) {
+            rc.setIntValue((Integer)value);
+        } else if (value.getClass() == Long.class) {
+            rc.setLongValue((Long)value);
+        } else if (value.getClass() == Float.class) {
+            rc.setFloatValue((Float)value);
+        } else if (value.getClass() == Double.class) {
+            rc.setDoubleValue((Double)value);
+        } else if (value.getClass() == byte[].class) {
+            rc.setByteArrayValue(ByteString.copyFrom((byte[])value));
+        } else if (value.getClass() == String.class) {
+            rc.setStringValue((String)value);
+        } else if (value instanceof Map) {
+            rc.setMapValueList(convert((Map<String, Object>)value));
+        } else if (value instanceof List) {
+            rc.setListValueList(convert((List<Object>)value));
+        } else {
+            throw new RuntimeException("Object is not a primitive: " + value);
+        } 
+        return rc;
+    }
+    
+    private static Object convertValue(PBValue value) {
+        
+        if( value==null || value.hasNullValue() ) {
+            return null;
+        }
+        if( value.hasBooleanValue() ) {
+            return value.getBooleanValue();
+        }
+        if( value.hasByteArrayValue() ) {
+            return value.getByteArrayValue().toByteArray();
+        }
+        if( value.hasByteValue() ) {
+            return value.getByteValue();
+        }
+        if( value.hasCharValue()) {
+            return value.getCharValue();
+        }
+        if( value.hasDoubleValue()) {
+            return value.getDoubleValue();
+        }
+        if( value.hasFloatValue()) {
+            return value.getFloatValue();
+        }
+        if( value.hasIntValue()) {
+            return value.getIntValue();
+        }
+        if( value.hasListValue()) {
+            return convertList(value.getListValueList());
+        }
+        if( value.hasLongValue()) {
+            return value.getLongValue();
+        }
+        if( value.hasMapValue()) {
+            return convertMap(value.getMapValueList());
+        }
+        if( value.hasShortValue()) {
+            return value.getShortValue();
+        }
+        if( value.hasStringValue()) {
+            return value.getStringValue();
+        }
+        return null;
+    }
+
+    public static Map<String, Object> convertMap(List<PBKeyValue> propertyList) {
+        HashMap<String, Object> rc = new HashMap<String, Object>(propertyList.size());
+        for (PBKeyValue keyValue : propertyList) {
+            rc.put(keyValue.getName(), convertValue(keyValue.getValue()));
+        }
+        return rc;
+    }
+    public static List<Object> convertList(List<PBValue> list) {
+        ArrayList<Object> rc = new ArrayList<Object>(list.size());
+        for (PBValue value : list) {
+            rc.add(convertValue(value));
+        }
+        return rc;
+    }
+
+    public static Message convert(PBMessage message) {
+        if( message == null ) {
+            return null;
+        }
+        switch( message.getMessageType() ) {
+        case EMPTY_MESSAGE:
+            return new ActiveMQMessage(message);
+        case BYTES_MESSAGE:
+            return new ActiveMQBytesMessage(message);
+        case MAP_MESSAGE:
+            return new ActiveMQMapMessage(message);
+        case OBJECT_MESSAGE:
+            return new ActiveMQObjectMessage(message);
+        case STREAM_MESSAGE:
+            return new ActiveMQStreamMessage(message);
+        case TEXT_MESSAGE:
+            return new ActiveMQTextMessage(message);
+        }
+        throw new RuntimeException("Unknonw message type: "+message.getMessageType());
+    }
+}

Added: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBWireFormat.java?rev=704995&view=auto
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBWireFormat.java (added)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBWireFormat.java Wed Oct 15 12:12:26 2008
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.pbwire;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionControl;
+import org.apache.activemq.command.ConnectionError;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.JournalQueueAck;
+import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.JournalTrace;
+import org.apache.activemq.command.JournalTransaction;
+import org.apache.activemq.command.KeepAliveInfo;
+import org.apache.activemq.command.MarshallAware;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.PBBrokerInfo;
+import org.apache.activemq.command.PBCommandHeader;
+import org.apache.activemq.command.PBConnectionControl;
+import org.apache.activemq.command.PBConnectionError;
+import org.apache.activemq.command.PBConnectionInfo;
+import org.apache.activemq.command.PBConsumerInfo;
+import org.apache.activemq.command.PBDestinationInfo;
+import org.apache.activemq.command.PBJournalQueueAck;
+import org.apache.activemq.command.PBJournalTopicAck;
+import org.apache.activemq.command.PBJournalTrace;
+import org.apache.activemq.command.PBJournalTransaction;
+import org.apache.activemq.command.PBKeepAliveInfo;
+import org.apache.activemq.command.PBMessage;
+import org.apache.activemq.command.PBMessageAck;
+import org.apache.activemq.command.PBMessageDispatch;
+import org.apache.activemq.command.PBProducerInfo;
+import org.apache.activemq.command.PBRemoveInfo;
+import org.apache.activemq.command.PBResponse;
+import org.apache.activemq.command.PBSessionInfo;
+import org.apache.activemq.command.PBShutdownInfo;
+import org.apache.activemq.command.PBTransactionInfo;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.ProtocolBufferBacked;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.protobuf.BaseMessage;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+
+/**
+ * 
+ * @version $Revision$
+ */
+public final class PBWireFormat implements WireFormat {
+
+    private final static PBCommandVisitor COMMAND_TRANSLATER= new CommandTranslater();
+
+    static private final class CommandTranslater extends PBCommandVisitor {
+        public Object visit(PBMessage pb) {
+            return PBConversionSupport.convert(pb);
+        }
+        
+        public Object visit(PBBrokerInfo pb) {
+            return new BrokerInfo(pb);
+        }
+        
+        public Object visit(PBConnectionControl pb) {
+            return new ConnectionControl(pb);
+        }
+        
+        public Object visit(PBConnectionError pb) {
+            return new ConnectionError(pb);
+        }
+        
+        public Object visit(PBConnectionInfo pb) {
+            return new ConnectionInfo(pb);
+        }
+        
+        public Object visit(PBSessionInfo pb) {
+            return new SessionInfo(pb);
+        }
+        
+        public Object visit(PBConsumerInfo pb) {
+            return new ConsumerInfo(pb);
+        }
+        
+        public Object visit(PBDestinationInfo pb) {
+            return new DestinationInfo(pb);
+        }
+        
+        public Object visit(PBProducerInfo pb) {
+            return new ProducerInfo(pb);
+        }
+        
+        public Object visit(PBMessageDispatch pb) {
+            return new MessageDispatch(pb);
+        }
+        
+        public Object visit(PBKeepAliveInfo pb) {
+            return new KeepAliveInfo(pb);
+        }
+        
+        public Object visit(PBMessageAck pb) {
+            return new MessageAck(pb);
+        }
+        
+        public Object visit(PBResponse pb) {
+            return new Response(pb);
+        }
+        
+        public Object visit(PBTransactionInfo pb) {
+            return new TransactionInfo(pb);
+        }
+        
+        public Object visit(PBRemoveInfo pb) {
+            return new RemoveInfo(pb);
+        }
+        
+        public Object visit(PBJournalQueueAck pb) {
+            return new JournalQueueAck(pb);
+        }
+        
+        public Object visit(PBJournalTopicAck pb) {
+            return new JournalTopicAck(pb);
+        }
+        
+        public Object visit(PBJournalTrace pb) {
+            return new JournalTrace(pb);
+        }
+        
+        public Object visit(PBJournalTransaction pb) {
+            return new JournalTransaction(pb);
+        }
+        
+        @Override
+        public Object visit(PBShutdownInfo pb) {
+            return new ShutdownInfo(pb);
+        }
+    }
+
+    private int version;
+    private WireFormatInfo preferedWireFormatInfo;
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+    public int getVersion() {
+        return version;
+    }
+
+    public ByteSequence marshal(Object o) throws IOException {
+        ProtocolBufferBacked command = (ProtocolBufferBacked) o;
+        
+        if( command instanceof MarshallAware ) {
+            ((MarshallAware)command).beforeMarshall(this);
+        }
+        
+        PBCommandHeader header = new PBCommandHeader();
+        Command cmd = command.getCommandObject();
+        if( cmd!=null ) {
+            header.setCommandId(cmd.getCommandId());
+            if( cmd.isResponseRequired() ) {
+                header.setResponseRequired(true);
+            }
+        }
+        
+        PBCommand message = command.getPBCommand();
+        header.setType(message.type());
+        
+        byte data[] = new byte[header.serializedSizeFramed() + message.serializedSizeFramed()];
+        CodedOutputStream os = CodedOutputStream.newInstance(data);
+        header.writeFramed(os);
+        message.writeFramed(os);
+        os.checkNoSpaceLeft();
+
+        return new ByteSequence(data);
+    }
+
+    public void marshal(Object o, DataOutput out) throws IOException {
+        ProtocolBufferBacked command = (ProtocolBufferBacked) o;
+        
+        if( command instanceof MarshallAware ) {
+            ((MarshallAware)command).beforeMarshall(this);
+        }
+
+        
+        PBCommandHeader header = new PBCommandHeader();
+        Command cmd = command.getCommandObject();
+        if( cmd!=null ) {
+            header.setCommandId(cmd.getCommandId());
+            if( cmd.isResponseRequired() ) {
+                header.setResponseRequired(true);
+            }
+        }
+        
+        PBCommand message = command.getPBCommand();
+        header.setType(message.type());
+        
+        CodedOutputStream os = CodedOutputStream.newInstance((OutputStream) out);
+        header.writeFramed(os);
+        message.writeFramed(os);
+        os.flush();        
+    }
+
+
+    public Object unmarshal(ByteSequence packet) throws IOException {
+        packet.compact();
+        CodedInputStream is = CodedInputStream.newInstance(packet.getData());
+        PBCommandHeader header = PBCommandHeader.parseFramed(is);
+        PBCommand message = (PBCommand)header.getType().createMessage();
+        
+        MarshallAware ma=null;
+        if( message instanceof MarshallAware ) {
+            ma = ((MarshallAware)message);
+        }
+        if( ma!=null ) {
+            ma.beforeUnmarshall(this);
+        }
+
+        
+        message.mergeFramed(is);
+        ProtocolBufferBacked rc = (ProtocolBufferBacked) message.visit(COMMAND_TRANSLATER);
+        Command cmd = rc.getCommandObject();
+        if( cmd!=null ) {
+            cmd.setCommandId(header.getCommandId());
+            cmd.setResponseRequired(header.getResponseRequired());
+        }
+        return rc;
+    }
+
+    public Object unmarshal(DataInput in) throws IOException {
+        
+        // We have to manually unframe the pb messages cause 
+        // creating a CodedInputStream from the in object would eat up extra bytes.
+        
+        InputStream is = (InputStream) in;
+        
+        int size = BaseMessage.readRawVarint32(is);
+        byte data[] = new byte[size];
+        in.readFully(data);
+        
+        PBCommandHeader header = PBCommandHeader.parseUnframed(data);
+        PBCommand message = (PBCommand)header.getType().createMessage();
+        
+        size = BaseMessage.readRawVarint32(is);
+        data = new byte[size];
+        in.readFully(data);
+        
+        message.mergeUnframed(data);
+        ProtocolBufferBacked rc = (ProtocolBufferBacked) message.visit(COMMAND_TRANSLATER);
+        Command cmd = rc.getCommandObject();
+        if( cmd!=null ) {
+            cmd.setCommandId(header.getCommandId());
+            cmd.setResponseRequired(header.getResponseRequired());
+        }
+        return rc;
+    }
+    
+    public void setPreferedWireFormatInfo(WireFormatInfo preferedWireFormatInfo) {
+        this.preferedWireFormatInfo = preferedWireFormatInfo;
+    }
+}

Propchange: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBWireFormat.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBWireFormatFactory.java?rev=704995&view=auto
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBWireFormatFactory.java (added)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBWireFormatFactory.java Wed Oct 15 12:12:26 2008
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.pbwire;
+
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+/**
+ * @version $Revision$
+ */
+public class PBWireFormatFactory implements WireFormatFactory {
+
+    //
+    // The default values here are what the wire format changes to after a
+    // default negotiation.
+    //
+
+    private int version = 1;
+    private boolean stackTraceEnabled = true;
+    private boolean tcpNoDelayEnabled = true;
+    private long maxInactivityDuration = 30*1000;
+    private long maxInactivityDurationInitalDelay = 10*1000;
+
+    public WireFormat createWireFormat() {
+        WireFormatInfo info = new WireFormatInfo();
+        info.setVersion(version);
+
+        try {
+            info.setStackTraceEnabled(stackTraceEnabled);
+            info.setTcpNoDelayEnabled(tcpNoDelayEnabled);
+            info.setMaxInactivityDuration(maxInactivityDuration);
+            info.setMaxInactivityDurationInitalDelay(maxInactivityDurationInitalDelay);
+        } catch (Exception e) {
+            IllegalStateException ise = new IllegalStateException("Could not configure WireFormatInfo");
+            ise.initCause(e);
+            throw ise;
+        }
+
+        PBWireFormat f = new PBWireFormat();
+        f.setPreferedWireFormatInfo(info);
+        return f;
+    }
+
+    public boolean isStackTraceEnabled() {
+        return stackTraceEnabled;
+    }
+
+    public void setStackTraceEnabled(boolean stackTraceEnabled) {
+        this.stackTraceEnabled = stackTraceEnabled;
+    }
+
+    public boolean isTcpNoDelayEnabled() {
+        return tcpNoDelayEnabled;
+    }
+
+    public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) {
+        this.tcpNoDelayEnabled = tcpNoDelayEnabled;
+    }
+
+    public int getVersion() {
+        return version;
+    }
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+
+
+    public long getMaxInactivityDuration() {
+        return maxInactivityDuration;
+    }
+
+    public void setMaxInactivityDuration(long maxInactivityDuration) {
+        this.maxInactivityDuration = maxInactivityDuration;
+    }
+
+    public long getMaxInactivityDurationInitalDelay() {
+        return maxInactivityDurationInitalDelay;
+    }
+
+    public void setMaxInactivityDurationInitalDelay(
+            long maxInactivityDurationInitalDelay) {
+        this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
+    }
+}

Propchange: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/pbwire/PBWireFormatFactory.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java Wed Oct 15 12:12:26 2008
@@ -213,7 +213,7 @@
     }
 
     protected WireFormatFactory createWireFormatFactory(Map<String, String> options) throws IOException {
-        String wireFormat = (String)options.get("wireFormat");
+        String wireFormat = (String)options.remove("wireFormat");
         if (wireFormat == null) {
             wireFormat = getDefaultWireFormatType();
         }

Modified: activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java Wed Oct 15 12:12:26 2008
@@ -17,7 +17,6 @@
 package org.apache.activemq.transport.stomp;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -25,6 +24,7 @@
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.Endpoint;
 import org.apache.activemq.command.Response;
+import org.apache.activemq.pbwire.PBCommand;
 import org.apache.activemq.state.CommandVisitor;
 
 /**
@@ -170,4 +170,12 @@
         }
         return buffer.toString();
     }
+
+    public PBCommand getPBCommand() {
+        return null;
+    }
+
+    public Command getCommandObject() {
+        return null;
+    }
 }

Added: activemq/sandbox/chirino-pb/activemq-core/src/main/proto/pbwire.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/proto/pbwire.proto?rev=704995&view=auto
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/proto/pbwire.proto (added)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/proto/pbwire.proto Wed Oct 15 12:12:26 2008
@@ -0,0 +1,550 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+package org.apache.activemq.command;
+
+option java_multiple_files = true;
+option java_outer_classname = "PB";
+
+//| option deferred_decode = true;
+//| option auto_clear_optional_fields = true;
+
+message PBSessionId {
+    required string connection_id=1;
+    required int64 id=2;
+}
+
+message PBProducerId {
+    required PBSessionId session_id=1;
+    required int64 id=2;
+}
+
+message PBConsumerId {
+    required PBSessionId session_id=1;
+    required int64 id=2;
+}
+
+message PBMessageId {
+    required PBProducerId producer_id=1;
+    required int64 id=2;
+    optional int64 broker_sequence_id=3;
+}
+
+message PBActiveMQDestination {
+  enum DestinationType {
+    QUEUE = 0;
+    TOPIC = 1;
+    TEMP_QUEUE = 2;
+    TEMP_TOPIC = 3;
+  }
+
+  required DestinationType type = 1 [default = QUEUE];
+  required string name = 2;
+}
+
+message PBTransactionId {
+	// option type=union;
+	optional PBLocalTransactionId local_transaction_id=1;
+	optional PBXATransactionId xa_transaction_id=2;
+}
+
+message PBLocalTransactionId {
+  required string connection_id=1;
+  required int64 id=2;
+}
+
+message PBXATransactionId {
+  required int32 format_id = 1;
+  required bytes branch_qualifier = 2;
+  required bytes global_transaction_id = 3;
+}
+
+message PBValue {
+	optional int32 byte_value=1;
+	optional bool boolean_value=2;
+	optional int32 char_value=3;
+	optional int32 short_value=4;
+	optional int32 int_value=5;
+	optional int64 long_value=6;
+	optional float float_value=7;
+	optional double double_value=8;
+	optional bytes byte_array_value=9;
+	optional string string_value=10;	
+	repeated PBKeyValue map_value=11;
+	repeated PBValue list_value=12;
+	optional bool null_value=13;
+}
+
+message PBKeyValue {
+	required string name=1;
+	required PBValue value=2;
+}
+
+message PBCommandHeader {
+    optional int32 command_id=1;
+    optional PBCommandType type=2;
+    optional bool response_required=4;
+}
+
+message PBStackTraceElement {
+    optional string class_name=1; 
+    optional string method_name=2;
+    optional string file_name=3; 
+    optional int32 line_number=4;
+}
+
+message PBThrowable {
+    optional string class_name=1;
+    optional string message=2;
+	repeated PBStackTraceElement stack_frames=3;
+    optional PBThrowable cause=4;
+}
+
+enum PBCommandType {
+  //| option java_create_message="true";
+  P_B_MESSAGE = 0;
+  P_B_BROKER_INFO = 1;
+  P_B_CONNECTION_CONTROL=2;
+  P_B_CONNECTION_ERROR=3;
+  P_B_CONNECTION_INFO=4;
+  P_B_CONSUMER_CONTROL=5;
+  P_B_CONSUMER_INFO=6;
+  P_B_CONTROL_COMMAND=7;
+  P_B_DESTINATION_INFO=8;
+  P_B_FLUSH_COMMAND=8;
+  P_B_KEEP_ALIVE_INFO=9;
+  P_B_MESSAGE_ACK=10;
+  P_B_PRODUCER_INFO=11;
+  P_B_MESSAGE_PULL=12;
+  P_B_PARTIAL_COMMAND=13;
+  P_B_PRODUCER_ACK=14;
+  P_B_REMOVE_INFO=15;
+  P_B_REMOVE_SUBSCRIPTION_INFO=16;
+  P_B_REPLAY_COMMAND=17;
+  P_B_RESPONSE=18;
+  P_B_SESSION_INFO=19;
+  P_B_SHUTDOWN_INFO=20;
+  P_B_TRANSACTION_INFO=21;
+  P_B_WIRE_FORMAT_INFO=22;
+  P_B_MESSAGE_DISPATCH=23;
+  P_B_MESSAGE_DISPATCH_NOTIFICATION=24;
+  P_B_JOURNAL_TRACE=25;
+  P_B_JOURNAL_TRANSACTION=26;
+  P_B_JOURNAL_TOPIC_ACK=27;
+  P_B_JOURNAL_QUEUE_ACK=28;
+}
+
+message PBJournalTrace {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBJournalTrace>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+
+	optional string message=1;
+}
+message PBJournalTransaction {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBJournalTransaction>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+
+    optional int32 type=1;
+    optional bool wasPrepared=2;
+    optional PBTransactionId transactionId=3;
+}
+message PBJournalTopicAck {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBJournalTopicAck>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+
+    optional PBActiveMQDestination destination=1;
+    optional string clientId=2;
+    optional string subscritionName=3;
+    optional PBMessageId messageId=4;
+    optional int64 messageSequenceId=5;
+    optional PBTransactionId transactionId=6;
+}
+message PBJournalQueueAck {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBJournalQueueAck>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+
+    optional PBActiveMQDestination destination=1;
+    optional PBMessageAck messageAck=2;
+}
+
+message PBMessageDispatch {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBMessageDispatch>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+
+    optional PBConsumerId consumerId=1;
+    optional PBActiveMQDestination destination=2;
+    optional PBMessage message=3;
+    optional int32 redeliveryCounter=4;
+}
+
+message PBMessageDispatchNotification {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBMessageDispatchNotification>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+    
+    optional PBConsumerId consumerId=1;
+    optional PBActiveMQDestination destination=2;
+    optional PBMessageId messageId=3;
+    optional int64 deliverySequenceId=4;
+}
+
+message PBWireFormatInfo {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBWireFormatInfo>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+
+	optional int32 version=1;
+	repeated PBKeyValue property=2;
+}
+
+message PBTransactionInfo {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBTransactionInfo>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+    
+    enum PBTransactionType {
+	    BEGIN = 0;
+	    PREPARE = 1;
+	    COMMIT_ONE_PHASE = 2;
+	    COMMIT_TWO_PHASE = 3;
+	    ROLLBACK = 4;
+	    RECOVER = 5;
+	    FORGET = 6;
+	    END = 7;
+    }
+
+    optional PBTransactionType type=1;
+    optional string connectionId=2;
+    optional PBTransactionId transactionId=3;
+    
+}
+
+message PBShutdownInfo {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBShutdownInfo>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+}
+
+message PBSessionInfo {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBSessionInfo>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+
+	optional PBSessionId sessionId=1;
+}
+
+message PBResponse {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBResponse>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+    
+	optional int32 correlationId=1;
+}
+
+message PBReplayCommand {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBReplayCommand>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+
+    optional string producerId=1;
+    optional int32 firstAckNumber=2;
+    optional int32 lastAckNumber=3;
+    optional int32 firstNakNumber=4;
+    optional int32 lastNakNumber=5;
+
+}
+
+message PBRemoveSubscriptionInfo {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBRemoveSubscriptionInfo>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+    
+    optional string connectionId=1;
+    optional string clientId=2;
+    optional string subscriptionName=3;
+    
+}
+
+message PBRemoveInfo {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBRemoveInfo>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+
+	optional string connection_id=1;
+	optional PBSessionId session_id=2;
+	optional PBConsumerId consumer_id=3;
+	optional PBProducerId producer_id=4;
+}
+
+message PBProducerAck {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBProducerAck>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+    
+    optional PBProducerId producerId=1;
+    optional int32 size=2;
+
+}
+
+message PBPartialCommand {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBPartialCommand>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+    
+    optional int32 command_id=1;
+    optional bytes data=2;
+}
+
+message PBMessagePull {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBMessagePull>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+
+    optional PBConsumerId consumerId=1;
+    optional PBActiveMQDestination destination=2;
+    optional int64 timeout=3;
+    optional PBMessageId messageId=4;
+    optional string correlationId=5;
+	
+}
+
+message PBProducerInfo {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBProducerInfo>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+    
+    optional PBProducerId producerId=1;
+    optional PBActiveMQDestination destination=2;
+    repeated string brokerPath=3;
+    optional bool dispatchAsync=4;
+    optional int32 windowSize=5;
+}
+
+message PBMessageAck {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBMessageAck>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+    
+    enum PBAckType {
+	    DELIVERED_ACK_TYPE = 0;
+	    POSION_ACK_TYPE = 1;
+	    STANDARD_ACK_TYPE = 2;
+	    REDELIVERED_ACK_TYPE = 3;
+	    INDIVIDUAL_ACK_TYPE = 4;
+    }
+    
+    optional PBAckType ackType=1;
+    optional PBConsumerId consumerId=2;
+    optional PBMessageId firstMessageId=3;
+    optional PBMessageId lastMessageId=4;
+    optional PBActiveMQDestination destination=5;
+    optional PBTransactionId transactionId=6;
+    optional int32 messageCount=7;
+}
+
+message PBKeepAliveInfo {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBKeepAliveInfo>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+}
+
+message PBFlushCommand {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBFlushCommand>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+}
+
+message PBDestinationInfo {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBDestinationInfo>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+    
+    
+	enum PBDestinationOperationType {
+	    ADD_OPERATION_TYPE = 0;
+	    REMOVE_OPERATION_TYPE = 1;
+	}
+	
+    optional string connection_id=1;
+    optional PBActiveMQDestination destination=2;
+    optional PBDestinationOperationType operation_type=3;
+    optional int64 timeout=4;
+    repeated string broker_path=5;
+}
+
+message PBControlCommand {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBControlCommand>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+    
+    required string command=1;
+}
+
+message PBConsumerInfo {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBConsumerInfo>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+    
+    required PBConsumerId consumer_id=1;
+    required PBActiveMQDestination destination=2;
+    optional int32 prefetch_size=3;
+    optional int32 maximum_pending_message_limit=4;
+    optional bool browser=5;
+    optional bool dispatch_async=6;
+    optional string selector=7;
+    optional string subscription_name=8;
+    optional bool no_local=9;
+    optional bool exclusive=10;
+    optional bool retroactive=11;
+    optional int32 priority=12;
+    repeated string broker_path=13;
+    optional bool optimized_acknowledge=14;
+    optional bool no_range_acks=15;
+
+    // protected BooleanExpression additionalPredicate;
+    
+}
+
+message PBConsumerControl {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBConsumerControl>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+    
+    required PBConsumerId consumer_id=1;
+    optional bool close=2;
+    optional bool stop=3;
+    optional bool start=4;
+    optional bool flush=5;
+    optional int32 prefetch=6;
+    
+}
+
+message PBConnectionInfo {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBConnectionInfo>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+
+    optional string connectionId=1;
+    optional string clientId=2;
+    optional string userName=3;
+    optional string password=4;
+    repeated string broker_path=5;
+    optional bool broker_master_connector=6;
+    optional bool manageable=7;
+    optional bool client_master=8;
+}
+
+message PBConnectionError {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBConnectionError>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+
+    optional string connection_id=1;
+    optional PBThrowable exception=2;
+}
+
+message PBConnectionControl {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBConnectionControl>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+
+    optional bool suspend=1;
+    optional bool resume=2;
+    optional bool close=3;
+    optional bool exit=4;
+    optional bool fault_tolerant=5;
+}
+
+message PBBrokerInfo {
+    //| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBBrokerInfo>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+    
+    optional string broker_id=1;
+    optional string broker_name=2;
+    optional string broker_url=3;
+    
+    repeated PBBrokerInfo peer_broker_infos=4;
+    optional int64 connectionId=5;
+    optional string broker_upload_url=6;
+    optional string network_properties=7;
+
+    optional bool slave_broker=8;
+    optional bool master_broker=9;
+    optional bool fault_tolerant_configuration=10;
+    optional bool network_connection=11;
+    optional bool duplex_connection=12;    
+}
+
+message PBMessage {
+	//| option java_type_method = "PBCommandType";
+    //| option java_implments = "org.apache.activemq.pbwire.PBCommand<PBMessage>";
+    //| option java_visitor = "org.apache.activemq.pbwire.PBCommandVisitor:Object:java.io.IOException";
+	
+	
+	enum PBMessageType {
+	    EMPTY_MESSAGE = 0;
+	    TEXT_MESSAGE = 1;
+	    BYTES_MESSAGE = 2;
+	    OBJECT_MESSAGE = 3;
+	    STREAM_MESSAGE = 4;
+	    MAP_MESSAGE = 5;
+    }
+	
+	required PBMessageType message_type=1;
+    required PBMessageId message_id=2;
+    required PBActiveMQDestination destination=3;
+    optional PBTransactionId transaction_id=4;
+    optional bool persistent=5 [default = false];
+    
+    optional bool compressed=6 [default = false];
+    optional bytes content=7;
+	repeated PBKeyValue property=8;
+
+    optional int64 timestamp=9;
+    optional int32 priority=10 [default = 4];
+    optional int64 expiration=11;
+    optional string type=12;
+    optional PBActiveMQDestination replyTo=13;
+    optional string correlationId=14;
+    optional string groupID=15;
+    optional int32 groupSequence=16;
+    
+    // Fields set by the broker: TODO perhaps we should 
+    // put this in a separate message class.
+    
+    optional string user_id=17;
+    optional int32 redeliveryCounter=18;
+    // Used when a message is routed to a new destination..
+    optional PBActiveMQDestination original_destination=22;
+    optional PBTransactionId original_transaction_id=23;
+	// Set by a network connector.    
+    repeated string brokerPath=26;
+    
+    optional PBConsumerId target_consumer_id=27;
+    optional int64 arrival=28;
+    optional bool droppable=29;
+    
+    repeated string cluster=30;
+    
+    optional int64 brokerInTime=31;
+    optional int64 brokerOutTime=32;
+}

Added: activemq/sandbox/chirino-pb/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/pb
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/pb?rev=704995&view=auto
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/pb (added)
+++ activemq/sandbox/chirino-pb/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/pb Wed Oct 15 12:12:26 2008
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.pbwire.PBWireFormatFactory
\ No newline at end of file

Modified: activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java Wed Oct 15 12:12:26 2008
@@ -60,9 +60,9 @@
         ConnectionInfo connectionInfo1 = createConnectionInfo();
         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
         ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
-        connection1.send(connectionInfo1);
-        connection1.send(sessionInfo1);
-        connection1.send(producerInfo);
+        connection1.request(connectionInfo1);
+        connection1.request(sessionInfo1);
+        connection1.request(producerInfo);
 
         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
         consumerInfo1.setPrefetchSize(1);

Modified: activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java?rev=704995&r1=704994&r2=704995&view=diff
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java (original)
+++ activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java Wed Oct 15 12:12:26 2008
@@ -386,7 +386,8 @@
     }
 
     public void testConvertProperties() throws Exception {
-        org.apache.activemq.command.Message msg = new org.apache.activemq.command.Message() {
+        org.apache.activemq.command.Message msg = new org.apache.activemq.command.Message(new PBMessage()) {
+            
             public org.apache.activemq.command.Message copy() {
                 return null;
             }

Added: activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/pbwire/PBAMQStoreQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/pbwire/PBAMQStoreQueueTest.java?rev=704995&view=auto
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/pbwire/PBAMQStoreQueueTest.java (added)
+++ activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/pbwire/PBAMQStoreQueueTest.java Wed Oct 15 12:12:26 2008
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.pbwire;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.perf.AMQStoreQueueTest;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+
+public class PBAMQStoreQueueTest extends AMQStoreQueueTest {
+
+    @Override
+    protected void setUp() throws Exception {
+        clientURI="tcp://localhost:61616?wireFormat=pb&jms.useAsyncSend=false&wireFormat.maxInactivityDuration=50000";
+        bindAddress="tcp://localhost:61616?wireFormat=pb";
+        super.setUp();
+    }
+    
+    @Override
+    protected void configureBroker(BrokerService answer, String uri) throws Exception {
+        super.configureBroker(answer, uri);
+        ((AMQPersistenceAdapter)answer.getPersistenceAdapter()).setWireFormat(new PBWireFormat());
+    }
+}

Added: activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/pbwire/SimpleTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/pbwire/SimpleTest.java?rev=704995&view=auto
==============================================================================
--- activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/pbwire/SimpleTest.java (added)
+++ activemq/sandbox/chirino-pb/activemq-core/src/test/java/org/apache/activemq/pbwire/SimpleTest.java Wed Oct 15 12:12:26 2008
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.pbwire;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+
+public class SimpleTest extends TestCase {
+    private BrokerService broker;
+
+    @Override
+    protected void setUp() throws Exception {
+        broker = createBroker();
+        AMQPersistenceAdapter pa = (AMQPersistenceAdapter) broker.getPersistenceAdapter();
+        pa.setWireFormat(new PBWireFormat());
+        broker.start();
+    }
+    
+    private BrokerService createBroker() throws Exception {
+        BrokerService b = new BrokerService();
+        b.addConnector("tcp://localhost:61616?wireFormat=pb");
+        return b;
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        broker.stop();
+    }
+    
+    public void testQueues() throws JMSException {
+        
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat=pb");
+        Destination destination=new ActiveMQQueue("TEST");
+        
+        Connection conn1 = cf.createConnection();
+        conn1.start();
+        
+        Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session1.createProducer(destination);
+        MessageConsumer consumer = session1.createConsumer(destination);
+        
+        
+        producer.send(session1.createTextMessage("Hello"));
+        
+        Message message = consumer.receive(5000);
+        assertEquals("Hello", ((TextMessage)message).getText());
+        
+        conn1.close();
+    }
+}



Mime
View raw message