activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmacn...@apache.org
Subject svn commit: r908857 [17/17] - in /activemq/sandbox/activemq-apollo-actor/activemq-amqp: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/activemq/ src/main/java/org/apache/activemq/amqp/ src/main/ja...
Date Thu, 11 Feb 2010 07:04:46 GMT
Added: activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpTxn.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpTxn.java?rev=908857&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpTxn.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpTxn.java Thu Feb 11 07:04:21 2010
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * his work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.amqp.protocol.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.Boolean;
+import java.util.Iterator;
+import org.apache.activemq.amqp.protocol.AmqpCommand;
+import org.apache.activemq.amqp.protocol.AmqpCommandHandler;
+import org.apache.activemq.amqp.protocol.marshaller.AmqpEncodingError;
+import org.apache.activemq.amqp.protocol.marshaller.AmqpMarshaller;
+import org.apache.activemq.amqp.protocol.marshaller.Encoded;
+import org.apache.activemq.amqp.protocol.types.IAmqpList;
+import org.apache.activemq.util.buffer.Buffer;
+
+/**
+ * Represents a mark transaction boundaries
+ * <p>
+ * This command is called when the work done on behalf a transaction branch finishes or needs
+ * to be suspended. If neither fail nor suspend are specified then the portion of work has
+ * completed successfully. When a Session is closed then the currently associated transaction
+ * branches MUST be marked rollback-only.
+ * </p>
+ */
+public interface AmqpTxn extends AmqpList, AmqpCommand {
+
+
+
+    /**
+     * options map
+     */
+    public void setOptions(AmqpOptions options);
+
+    /**
+     * options map
+     */
+    public AmqpOptions getOptions();
+
+    /**
+     * Failure flag
+     * <p>
+     * If set, indicates that this portion of work has failed; otherwise this portion of work
+     * has completed successfully. An implementation MAY elect to roll a transaction back if
+     * this failure notification is received. Should an implementation elect to implement this
+     * behavior, and this bit is set, then then the transaction branch SHOULD be marked as
+     * rollback-only and the end result SHOULD have the xa-rbrollback status set.
+     * </p>
+     */
+    public void setFail(Boolean fail);
+
+    /**
+     * Failure flag
+     * <p>
+     * If set, indicates that this portion of work has failed; otherwise this portion of work
+     * has completed successfully. An implementation MAY elect to roll a transaction back if
+     * this failure notification is received. Should an implementation elect to implement this
+     * behavior, and this bit is set, then then the transaction branch SHOULD be marked as
+     * rollback-only and the end result SHOULD have the xa-rbrollback status set.
+     * </p>
+     */
+    public void setFail(AmqpBoolean fail);
+
+    /**
+     * Failure flag
+     * <p>
+     * If set, indicates that this portion of work has failed; otherwise this portion of work
+     * has completed successfully. An implementation MAY elect to roll a transaction back if
+     * this failure notification is received. Should an implementation elect to implement this
+     * behavior, and this bit is set, then then the transaction branch SHOULD be marked as
+     * rollback-only and the end result SHOULD have the xa-rbrollback status set.
+     * </p>
+     */
+    public Boolean getFail();
+
+    /**
+     * Temporary suspension flag
+     * <p>
+     * Indicates that the transaction branch is temporarily suspended in an incomplete state.
+     * The transaction context is in a suspended state and must be resumed via the enlist
+     * command with resume specified.
+     * </p>
+     */
+    public void setSuspend(Boolean suspend);
+
+    /**
+     * Temporary suspension flag
+     * <p>
+     * Indicates that the transaction branch is temporarily suspended in an incomplete state.
+     * The transaction context is in a suspended state and must be resumed via the enlist
+     * command with resume specified.
+     * </p>
+     */
+    public void setSuspend(AmqpBoolean suspend);
+
+    /**
+     * Temporary suspension flag
+     * <p>
+     * Indicates that the transaction branch is temporarily suspended in an incomplete state.
+     * The transaction context is in a suspended state and must be resumed via the enlist
+     * command with resume specified.
+     * </p>
+     */
+    public Boolean getSuspend();
+
+    public static class AmqpTxnBean implements AmqpTxn{
+
+        private AmqpTxnBuffer buffer;
+        private AmqpTxnBean bean = this;
+        private AmqpOptions options;
+        private AmqpBoolean fail;
+        private AmqpBoolean suspend;
+
+        public AmqpTxnBean() {
+        }
+
+        public AmqpTxnBean(IAmqpList value) {
+            //TODO we should defer decoding of the described type:
+            for(int i = 0; i < value.getListCount(); i++) {
+                set(i, value.get(i));
+            }
+        }
+
+        public AmqpTxnBean(AmqpTxn.AmqpTxnBean other) {
+            this.bean = other;
+        }
+
+        public final AmqpTxnBean copy() {
+            return new AmqpTxn.AmqpTxnBean(bean);
+        }
+
+        public final void handle(AmqpCommandHandler handler) throws Exception {
+            handler.handleTxn(this);
+        }
+
+        public final AmqpTxn.AmqpTxnBuffer getBuffer(AmqpMarshaller marshaller) throws AmqpEncodingError{
+            if(buffer == null) {
+                buffer = new AmqpTxnBuffer(marshaller.encode(this));
+            }
+            return buffer;
+        }
+
+        public final void marshal(DataOutput out, AmqpMarshaller marshaller) throws IOException, AmqpEncodingError{
+            getBuffer(marshaller).marshal(out, marshaller);
+        }
+
+
+        public final void setOptions(AmqpOptions options) {
+            copyCheck();
+            bean.options = options;
+        }
+
+        public final AmqpOptions getOptions() {
+            return bean.options;
+        }
+
+        public void setFail(Boolean fail) {
+            setFail(new AmqpBoolean.AmqpBooleanBean(fail));
+        }
+
+
+        public final void setFail(AmqpBoolean fail) {
+            copyCheck();
+            bean.fail = fail;
+        }
+
+        public final Boolean getFail() {
+            return bean.fail.getValue();
+        }
+
+        public void setSuspend(Boolean suspend) {
+            setSuspend(new AmqpBoolean.AmqpBooleanBean(suspend));
+        }
+
+
+        public final void setSuspend(AmqpBoolean suspend) {
+            copyCheck();
+            bean.suspend = suspend;
+        }
+
+        public final Boolean getSuspend() {
+            return bean.suspend.getValue();
+        }
+
+        public void set(int index, AmqpType<?, ?> value) {
+            switch(index) {
+            case 0: {
+                setOptions((AmqpOptions) value);
+                break;
+            }
+            case 1: {
+                setFail((AmqpBoolean) value);
+                break;
+            }
+            case 2: {
+                setSuspend((AmqpBoolean) value);
+                break;
+            }
+            default : {
+                throw new IndexOutOfBoundsException(String.valueOf(index));
+            }
+            }
+        }
+
+        public AmqpType<?, ?> get(int index) {
+            switch(index) {
+            case 0: {
+                return bean.options;
+            }
+            case 1: {
+                return bean.fail;
+            }
+            case 2: {
+                return bean.suspend;
+            }
+            default : {
+                throw new IndexOutOfBoundsException(String.valueOf(index));
+            }
+            }
+        }
+
+        public int getListCount() {
+            return 3;
+        }
+
+        public IAmqpList getValue() {
+            return bean;
+        }
+
+        public Iterator<AmqpType<?, ?>> iterator() {
+            return new AmqpListIterator(bean);
+        }
+
+
+        private final void copyCheck() {
+            if(buffer != null) {;
+                throw new IllegalStateException("unwriteable");
+            }
+            if(bean != this) {;
+                copy(bean);
+            }
+        }
+
+        private final void copy(AmqpTxn.AmqpTxnBean other) {
+            this.options= other.options;
+            this.fail= other.fail;
+            this.suspend= other.suspend;
+            bean = this;
+        }
+
+        public boolean equivalent(AmqpType<?,?> t){
+            if(this == t) {
+                return true;
+            }
+
+            if(t == null || !(t instanceof AmqpTxn)) {
+                return false;
+            }
+
+            return equivalent((AmqpTxn) t);
+        }
+
+        public boolean equivalent(AmqpTxn b) {
+
+            if(b.getOptions() == null ^ getOptions() == null) {
+                return false;
+            }
+            if(b.getOptions() != null && !b.getOptions().equals(getOptions())){ 
+                return false;
+            }
+
+            if(b.getFail() == null ^ getFail() == null) {
+                return false;
+            }
+            if(b.getFail() != null && !b.getFail().equals(getFail())){ 
+                return false;
+            }
+
+            if(b.getSuspend() == null ^ getSuspend() == null) {
+                return false;
+            }
+            if(b.getSuspend() != null && !b.getSuspend().equals(getSuspend())){ 
+                return false;
+            }
+            return true;
+        }
+    }
+
+    public static class AmqpTxnBuffer extends AmqpList.AmqpListBuffer implements AmqpTxn{
+
+        private AmqpTxnBean bean;
+
+        protected AmqpTxnBuffer(Encoded<IAmqpList> encoded) {
+            super(encoded);
+        }
+
+        public final void setOptions(AmqpOptions options) {
+            bean().setOptions(options);
+        }
+
+        public final AmqpOptions getOptions() {
+            return bean().getOptions();
+        }
+
+    public void setFail(Boolean fail) {
+            bean().setFail(fail);
+        }
+
+        public final void setFail(AmqpBoolean fail) {
+            bean().setFail(fail);
+        }
+
+        public final Boolean getFail() {
+            return bean().getFail();
+        }
+
+    public void setSuspend(Boolean suspend) {
+            bean().setSuspend(suspend);
+        }
+
+        public final void setSuspend(AmqpBoolean suspend) {
+            bean().setSuspend(suspend);
+        }
+
+        public final Boolean getSuspend() {
+            return bean().getSuspend();
+        }
+
+        public void set(int index, AmqpType<?, ?> value) {
+            bean().set(index, value);
+        }
+
+        public AmqpType<?, ?> get(int index) {
+            return bean().get(index);
+        }
+
+        public int getListCount() {
+            return bean().getListCount();
+        }
+
+        public Iterator<AmqpType<?, ?>> iterator() {
+            return bean().iterator();
+        }
+
+        public IAmqpList getValue() {
+            return bean().getValue();
+        }
+
+        public AmqpTxn.AmqpTxnBuffer getBuffer(AmqpMarshaller marshaller) throws AmqpEncodingError{
+            return this;
+        }
+
+        protected AmqpTxn bean() {
+            if(bean == null) {
+                bean = new AmqpTxn.AmqpTxnBean(encoded.getValue());
+                bean.buffer = this;
+            }
+            return bean;
+        }
+
+        public final void handle(AmqpCommandHandler handler) throws Exception {
+            handler.handleTxn(this);
+        }
+
+        public boolean equivalent(AmqpType<?, ?> t) {
+            return bean().equivalent(t);
+        }
+
+        public static AmqpTxn.AmqpTxnBuffer create(Encoded<IAmqpList> encoded) {
+            if(encoded.isNull()) {
+                return null;
+            }
+            return new AmqpTxn.AmqpTxnBuffer(encoded);
+        }
+
+        public static AmqpTxn.AmqpTxnBuffer create(DataInput in, AmqpMarshaller marshaller) throws IOException, AmqpEncodingError {
+            return create(marshaller.unmarshalAmqpTxn(in));
+        }
+
+        public static AmqpTxn.AmqpTxnBuffer create(Buffer buffer, int offset, AmqpMarshaller marshaller) throws AmqpEncodingError {
+            return create(marshaller.decodeAmqpTxn(buffer, offset));
+        }
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpTxnLevel.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpTxnLevel.java?rev=908857&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpTxnLevel.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpTxnLevel.java Thu Feb 11 07:04:21 2010
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * his work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.amqp.protocol.types;
+
+import java.lang.Short;
+import java.util.HashMap;
+import org.apache.activemq.amqp.protocol.marshaller.AmqpEncodingError;
+import org.apache.activemq.amqp.protocol.types.AmqpUbyte;
+
+/**
+ * Represents a transaction level
+ */
+public enum AmqpTxnLevel {
+
+    LOCAL(new Short("1")),
+    DISTRIBUTED(new Short("2")),
+    PROMOTABLE(new Short("3"));
+
+    private static final HashMap<Short, AmqpTxnLevel> LOOKUP = new HashMap<Short, AmqpTxnLevel>(2);
+    static {
+        for (AmqpTxnLevel txnLevel : AmqpTxnLevel.values()) {
+            LOOKUP.put(txnLevel.value.getValue(), txnLevel);
+        }
+    }
+
+    private final AmqpUbyte value;
+
+    private AmqpTxnLevel(Short value) {
+        this.value = new AmqpUbyte.AmqpUbyteBean(value);
+    }
+
+    public final AmqpUbyte getValue() {
+        return value;
+    }
+
+    public static final AmqpTxnLevel get(AmqpUbyte value) throws AmqpEncodingError{
+        AmqpTxnLevel txnLevel= LOOKUP.get(value.getValue());
+        if (txnLevel == null) {
+            //TODO perhaps this should be an IllegalArgumentException?
+            throw new AmqpEncodingError("Unknown txnLevel: " + value + " expected one of " + LOOKUP.keySet());
+        }
+        return txnLevel;
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpType.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpType.java?rev=908857&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpType.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpType.java Thu Feb 11 07:04:21 2010
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with his work
+ * for additional information regarding copyright ownership. The ASF licenses
+ * this file to You under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.activemq.amqp.protocol.types;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.activemq.amqp.protocol.marshaller.AmqpEncodingError;
+import org.apache.activemq.amqp.protocol.marshaller.AmqpMarshaller;
+
+public interface AmqpType<Bean, EncodedBuffer extends AmqpBuffer<?>> {
+
+    public EncodedBuffer getBuffer(AmqpMarshaller marshaller) throws AmqpEncodingError;
+    
+    public void marshal(DataOutput out, AmqpMarshaller marshaller) throws IOException, AmqpEncodingError;
+    
+    public boolean equivalent(AmqpType<?,?> t);
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUbyte.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUbyte.java?rev=908857&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUbyte.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUbyte.java Thu Feb 11 07:04:21 2010
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * his work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.amqp.protocol.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.Short;
+import org.apache.activemq.amqp.protocol.marshaller.AmqpEncodingError;
+import org.apache.activemq.amqp.protocol.marshaller.AmqpMarshaller;
+import org.apache.activemq.amqp.protocol.marshaller.Encoded;
+import org.apache.activemq.util.buffer.Buffer;
+
+/**
+ * Represents a integer in the range 0 to 2^8 - 1
+ */
+public interface AmqpUbyte extends AmqpType<AmqpUbyte.AmqpUbyteBean, AmqpUbyte.AmqpUbyteBuffer> {
+
+
+    public Short getValue();
+
+    public static class AmqpUbyteBean implements AmqpUbyte{
+
+        private AmqpUbyteBuffer buffer;
+        private AmqpUbyteBean bean = this;
+        private Short value;
+
+        protected AmqpUbyteBean() {
+        }
+
+        public AmqpUbyteBean(Short value) {
+            this.value = value;
+        }
+
+        public AmqpUbyteBean(AmqpUbyte.AmqpUbyteBean other) {
+            this.bean = other;
+        }
+
+        public final AmqpUbyteBean copy() {
+            return bean;
+        }
+
+        public final AmqpUbyte.AmqpUbyteBuffer getBuffer(AmqpMarshaller marshaller) throws AmqpEncodingError{
+            if(buffer == null) {
+                buffer = new AmqpUbyteBuffer(marshaller.encode(this));
+            }
+            return buffer;
+        }
+
+        public final void marshal(DataOutput out, AmqpMarshaller marshaller) throws IOException, AmqpEncodingError{
+            getBuffer(marshaller).marshal(out, marshaller);
+        }
+
+
+        public Short getValue() {
+            return bean.value;
+        }
+
+
+        public boolean equals(Object o){
+            if(this == o) {
+                return true;
+            }
+
+            if(o == null || !(o instanceof AmqpUbyte)) {
+                return false;
+            }
+
+            return equivalent((AmqpUbyte) o);
+        }
+
+        public int hashCode() {
+            if(getValue() == null) {
+                return AmqpUbyte.AmqpUbyteBean.class.hashCode();
+            }
+            return getValue().hashCode();
+        }
+
+        public boolean equivalent(AmqpType<?,?> t){
+            if(this == t) {
+                return true;
+            }
+
+            if(t == null || !(t instanceof AmqpUbyte)) {
+                return false;
+            }
+
+            return equivalent((AmqpUbyte) t);
+        }
+
+        public boolean equivalent(AmqpUbyte b) {
+            if(b == null) {
+                return false;
+            }
+
+            if(b.getValue() == null ^ getValue() == null) {
+                return false;
+            }
+
+            return b.getValue() == null || b.getValue().equals(getValue());
+        }
+    }
+
+    public static class AmqpUbyteBuffer implements AmqpUbyte, AmqpBuffer< Short> {
+
+        private AmqpUbyteBean bean;
+        protected Encoded<Short> encoded;
+
+        protected AmqpUbyteBuffer() {
+        }
+
+        protected AmqpUbyteBuffer(Encoded<Short> encoded) {
+            this.encoded = encoded;
+        }
+
+        public final Encoded<Short> getEncoded() throws AmqpEncodingError{
+            return encoded;
+        }
+
+        public final void marshal(DataOutput out, AmqpMarshaller marshaller) throws IOException, AmqpEncodingError{
+            encoded.marshal(out);
+        }
+
+        public Short getValue() {
+            return bean().getValue();
+        }
+
+        public AmqpUbyte.AmqpUbyteBuffer getBuffer(AmqpMarshaller marshaller) throws AmqpEncodingError{
+            return this;
+        }
+
+        protected AmqpUbyte bean() {
+            if(bean == null) {
+                bean = new AmqpUbyte.AmqpUbyteBean(encoded.getValue());
+                bean.buffer = this;
+            }
+            return bean;
+        }
+
+        public boolean equals(Object o){
+            return bean().equals(o);
+        }
+
+        public int hashCode() {
+            return bean().hashCode();
+        }
+
+        public boolean equivalent(AmqpType<?, ?> t) {
+            return bean().equivalent(t);
+        }
+
+        public static AmqpUbyte.AmqpUbyteBuffer create(Encoded<Short> encoded) {
+            if(encoded.isNull()) {
+                return null;
+            }
+            return new AmqpUbyte.AmqpUbyteBuffer(encoded);
+        }
+
+        public static AmqpUbyte.AmqpUbyteBuffer create(DataInput in, AmqpMarshaller marshaller) throws IOException, AmqpEncodingError {
+            return create(marshaller.unmarshalAmqpUbyte(in));
+        }
+
+        public static AmqpUbyte.AmqpUbyteBuffer create(Buffer buffer, int offset, AmqpMarshaller marshaller) throws AmqpEncodingError {
+            return create(marshaller.decodeAmqpUbyte(buffer, offset));
+        }
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUint.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUint.java?rev=908857&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUint.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUint.java Thu Feb 11 07:04:21 2010
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * his work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.amqp.protocol.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.Long;
+import org.apache.activemq.amqp.protocol.marshaller.AmqpEncodingError;
+import org.apache.activemq.amqp.protocol.marshaller.AmqpMarshaller;
+import org.apache.activemq.amqp.protocol.marshaller.Encoded;
+import org.apache.activemq.util.buffer.Buffer;
+
+/**
+ * Represents a integer in the range 0 to 2^32 - 1
+ */
+public interface AmqpUint extends AmqpType<AmqpUint.AmqpUintBean, AmqpUint.AmqpUintBuffer> {
+
+
+    public Long getValue();
+
+    public static class AmqpUintBean implements AmqpUint{
+
+        private AmqpUintBuffer buffer;
+        private AmqpUintBean bean = this;
+        private Long value;
+
+        protected AmqpUintBean() {
+        }
+
+        public AmqpUintBean(Long value) {
+            this.value = value;
+        }
+
+        public AmqpUintBean(AmqpUint.AmqpUintBean other) {
+            this.bean = other;
+        }
+
+        public final AmqpUintBean copy() {
+            return bean;
+        }
+
+        public final AmqpUint.AmqpUintBuffer getBuffer(AmqpMarshaller marshaller) throws AmqpEncodingError{
+            if(buffer == null) {
+                buffer = new AmqpUintBuffer(marshaller.encode(this));
+            }
+            return buffer;
+        }
+
+        public final void marshal(DataOutput out, AmqpMarshaller marshaller) throws IOException, AmqpEncodingError{
+            getBuffer(marshaller).marshal(out, marshaller);
+        }
+
+
+        public Long getValue() {
+            return bean.value;
+        }
+
+
+        public boolean equals(Object o){
+            if(this == o) {
+                return true;
+            }
+
+            if(o == null || !(o instanceof AmqpUint)) {
+                return false;
+            }
+
+            return equivalent((AmqpUint) o);
+        }
+
+        public int hashCode() {
+            if(getValue() == null) {
+                return AmqpUint.AmqpUintBean.class.hashCode();
+            }
+            return getValue().hashCode();
+        }
+
+        public boolean equivalent(AmqpType<?,?> t){
+            if(this == t) {
+                return true;
+            }
+
+            if(t == null || !(t instanceof AmqpUint)) {
+                return false;
+            }
+
+            return equivalent((AmqpUint) t);
+        }
+
+        public boolean equivalent(AmqpUint b) {
+            if(b == null) {
+                return false;
+            }
+
+            if(b.getValue() == null ^ getValue() == null) {
+                return false;
+            }
+
+            return b.getValue() == null || b.getValue().equals(getValue());
+        }
+    }
+
+    public static class AmqpUintBuffer implements AmqpUint, AmqpBuffer< Long> {
+
+        private AmqpUintBean bean;
+        protected Encoded<Long> encoded;
+
+        protected AmqpUintBuffer() {
+        }
+
+        protected AmqpUintBuffer(Encoded<Long> encoded) {
+            this.encoded = encoded;
+        }
+
+        public final Encoded<Long> getEncoded() throws AmqpEncodingError{
+            return encoded;
+        }
+
+        public final void marshal(DataOutput out, AmqpMarshaller marshaller) throws IOException, AmqpEncodingError{
+            encoded.marshal(out);
+        }
+
+        public Long getValue() {
+            return bean().getValue();
+        }
+
+        public AmqpUint.AmqpUintBuffer getBuffer(AmqpMarshaller marshaller) throws AmqpEncodingError{
+            return this;
+        }
+
+        protected AmqpUint bean() {
+            if(bean == null) {
+                bean = new AmqpUint.AmqpUintBean(encoded.getValue());
+                bean.buffer = this;
+            }
+            return bean;
+        }
+
+        public boolean equals(Object o){
+            return bean().equals(o);
+        }
+
+        public int hashCode() {
+            return bean().hashCode();
+        }
+
+        public boolean equivalent(AmqpType<?, ?> t) {
+            return bean().equivalent(t);
+        }
+
+        public static AmqpUint.AmqpUintBuffer create(Encoded<Long> encoded) {
+            if(encoded.isNull()) {
+                return null;
+            }
+            return new AmqpUint.AmqpUintBuffer(encoded);
+        }
+
+        public static AmqpUint.AmqpUintBuffer create(DataInput in, AmqpMarshaller marshaller) throws IOException, AmqpEncodingError {
+            return create(marshaller.unmarshalAmqpUint(in));
+        }
+
+        public static AmqpUint.AmqpUintBuffer create(Buffer buffer, int offset, AmqpMarshaller marshaller) throws AmqpEncodingError {
+            return create(marshaller.decodeAmqpUint(buffer, offset));
+        }
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUlong.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUlong.java?rev=908857&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUlong.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUlong.java Thu Feb 11 07:04:21 2010
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * his work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.amqp.protocol.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigInteger;
+import org.apache.activemq.amqp.protocol.marshaller.AmqpEncodingError;
+import org.apache.activemq.amqp.protocol.marshaller.AmqpMarshaller;
+import org.apache.activemq.amqp.protocol.marshaller.Encoded;
+import org.apache.activemq.util.buffer.Buffer;
+
+/**
+ * Represents a integer in the range 0 to 2^64 - 1
+ */
+public interface AmqpUlong extends AmqpType<AmqpUlong.AmqpUlongBean, AmqpUlong.AmqpUlongBuffer> {
+
+
+    public BigInteger getValue();
+
+    public static class AmqpUlongBean implements AmqpUlong{
+
+        private AmqpUlongBuffer buffer;
+        private AmqpUlongBean bean = this;
+        private BigInteger value;
+
+        protected AmqpUlongBean() {
+        }
+
+        public AmqpUlongBean(BigInteger value) {
+            this.value = value;
+        }
+
+        public AmqpUlongBean(AmqpUlong.AmqpUlongBean other) {
+            this.bean = other;
+        }
+
+        public final AmqpUlongBean copy() {
+            return bean;
+        }
+
+        public final AmqpUlong.AmqpUlongBuffer getBuffer(AmqpMarshaller marshaller) throws AmqpEncodingError{
+            if(buffer == null) {
+                buffer = new AmqpUlongBuffer(marshaller.encode(this));
+            }
+            return buffer;
+        }
+
+        public final void marshal(DataOutput out, AmqpMarshaller marshaller) throws IOException, AmqpEncodingError{
+            getBuffer(marshaller).marshal(out, marshaller);
+        }
+
+
+        public BigInteger getValue() {
+            return bean.value;
+        }
+
+
+        public boolean equals(Object o){
+            if(this == o) {
+                return true;
+            }
+
+            if(o == null || !(o instanceof AmqpUlong)) {
+                return false;
+            }
+
+            return equivalent((AmqpUlong) o);
+        }
+
+        public int hashCode() {
+            if(getValue() == null) {
+                return AmqpUlong.AmqpUlongBean.class.hashCode();
+            }
+            return getValue().hashCode();
+        }
+
+        public boolean equivalent(AmqpType<?,?> t){
+            if(this == t) {
+                return true;
+            }
+
+            if(t == null || !(t instanceof AmqpUlong)) {
+                return false;
+            }
+
+            return equivalent((AmqpUlong) t);
+        }
+
+        public boolean equivalent(AmqpUlong b) {
+            if(b == null) {
+                return false;
+            }
+
+            if(b.getValue() == null ^ getValue() == null) {
+                return false;
+            }
+
+            return b.getValue() == null || b.getValue().equals(getValue());
+        }
+    }
+
+    public static class AmqpUlongBuffer implements AmqpUlong, AmqpBuffer< BigInteger> {
+
+        private AmqpUlongBean bean;
+        protected Encoded<BigInteger> encoded;
+
+        protected AmqpUlongBuffer() {
+        }
+
+        protected AmqpUlongBuffer(Encoded<BigInteger> encoded) {
+            this.encoded = encoded;
+        }
+
+        public final Encoded<BigInteger> getEncoded() throws AmqpEncodingError{
+            return encoded;
+        }
+
+        public final void marshal(DataOutput out, AmqpMarshaller marshaller) throws IOException, AmqpEncodingError{
+            encoded.marshal(out);
+        }
+
+        public BigInteger getValue() {
+            return bean().getValue();
+        }
+
+        public AmqpUlong.AmqpUlongBuffer getBuffer(AmqpMarshaller marshaller) throws AmqpEncodingError{
+            return this;
+        }
+
+        protected AmqpUlong bean() {
+            if(bean == null) {
+                bean = new AmqpUlong.AmqpUlongBean(encoded.getValue());
+                bean.buffer = this;
+            }
+            return bean;
+        }
+
+        public boolean equals(Object o){
+            return bean().equals(o);
+        }
+
+        public int hashCode() {
+            return bean().hashCode();
+        }
+
+        public boolean equivalent(AmqpType<?, ?> t) {
+            return bean().equivalent(t);
+        }
+
+        public static AmqpUlong.AmqpUlongBuffer create(Encoded<BigInteger> encoded) {
+            if(encoded.isNull()) {
+                return null;
+            }
+            return new AmqpUlong.AmqpUlongBuffer(encoded);
+        }
+
+        public static AmqpUlong.AmqpUlongBuffer create(DataInput in, AmqpMarshaller marshaller) throws IOException, AmqpEncodingError {
+            return create(marshaller.unmarshalAmqpUlong(in));
+        }
+
+        public static AmqpUlong.AmqpUlongBuffer create(Buffer buffer, int offset, AmqpMarshaller marshaller) throws AmqpEncodingError {
+            return create(marshaller.decodeAmqpUlong(buffer, offset));
+        }
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUnlink.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUnlink.java?rev=908857&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUnlink.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUnlink.java Thu Feb 11 07:04:21 2010
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * his work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.amqp.protocol.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.activemq.amqp.protocol.AmqpCommand;
+import org.apache.activemq.amqp.protocol.AmqpCommandHandler;
+import org.apache.activemq.amqp.protocol.marshaller.AmqpEncodingError;
+import org.apache.activemq.amqp.protocol.marshaller.AmqpMarshaller;
+import org.apache.activemq.amqp.protocol.marshaller.Encoded;
+import org.apache.activemq.amqp.protocol.types.IAmqpList;
+import org.apache.activemq.util.buffer.Buffer;
+
+/**
+ * Represents a close the Link
+ * <p>
+ * Close the Link and un-map the handle.
+ * </p>
+ */
+public interface AmqpUnlink extends AmqpList, AmqpCommand {
+
+
+
+    /**
+     * options map
+     */
+    public void setOptions(AmqpOptions options);
+
+    /**
+     * options map
+     */
+    public AmqpOptions getOptions();
+
+    /**
+     * identifies the Link
+     */
+    public void setHandle(AmqpHandle handle);
+
+    /**
+     * identifies the Link
+     */
+    public AmqpHandle getHandle();
+
+    /**
+     * error causing the unlink
+     * <p>
+     * If set, this field indicates that the Link is being unlinked due to an exceptional
+     * condition. The value of the field should contain details on the cause of the exception.
+     * </p>
+     */
+    public void setException(AmqpLinkError exception);
+
+    /**
+     * error causing the unlink
+     * <p>
+     * If set, this field indicates that the Link is being unlinked due to an exceptional
+     * condition. The value of the field should contain details on the cause of the exception.
+     * </p>
+     */
+    public AmqpLinkError getException();
+
+    public static class AmqpUnlinkBean implements AmqpUnlink{
+
+        private AmqpUnlinkBuffer buffer;
+        private AmqpUnlinkBean bean = this;
+        private AmqpOptions options;
+        private AmqpHandle handle;
+        private AmqpLinkError exception;
+
+        public AmqpUnlinkBean() {
+        }
+
+        public AmqpUnlinkBean(IAmqpList value) {
+            //TODO we should defer decoding of the described type:
+            for(int i = 0; i < value.getListCount(); i++) {
+                set(i, value.get(i));
+            }
+        }
+
+        public AmqpUnlinkBean(AmqpUnlink.AmqpUnlinkBean other) {
+            this.bean = other;
+        }
+
+        public final AmqpUnlinkBean copy() {
+            return new AmqpUnlink.AmqpUnlinkBean(bean);
+        }
+
+        public final void handle(AmqpCommandHandler handler) throws Exception {
+            handler.handleUnlink(this);
+        }
+
+        public final AmqpUnlink.AmqpUnlinkBuffer getBuffer(AmqpMarshaller marshaller) throws AmqpEncodingError{
+            if(buffer == null) {
+                buffer = new AmqpUnlinkBuffer(marshaller.encode(this));
+            }
+            return buffer;
+        }
+
+        public final void marshal(DataOutput out, AmqpMarshaller marshaller) throws IOException, AmqpEncodingError{
+            getBuffer(marshaller).marshal(out, marshaller);
+        }
+
+
+        public final void setOptions(AmqpOptions options) {
+            copyCheck();
+            bean.options = options;
+        }
+
+        public final AmqpOptions getOptions() {
+            return bean.options;
+        }
+
+        public final void setHandle(AmqpHandle handle) {
+            copyCheck();
+            bean.handle = handle;
+        }
+
+        public final AmqpHandle getHandle() {
+            return bean.handle;
+        }
+
+        public final void setException(AmqpLinkError exception) {
+            copyCheck();
+            bean.exception = exception;
+        }
+
+        public final AmqpLinkError getException() {
+            return bean.exception;
+        }
+
+        public void set(int index, AmqpType<?, ?> value) {
+            switch(index) {
+            case 0: {
+                setOptions((AmqpOptions) value);
+                break;
+            }
+            case 1: {
+                setHandle((AmqpHandle) value);
+                break;
+            }
+            case 2: {
+                setException((AmqpLinkError) value);
+                break;
+            }
+            default : {
+                throw new IndexOutOfBoundsException(String.valueOf(index));
+            }
+            }
+        }
+
+        public AmqpType<?, ?> get(int index) {
+            switch(index) {
+            case 0: {
+                return bean.options;
+            }
+            case 1: {
+                return bean.handle;
+            }
+            case 2: {
+                return bean.exception;
+            }
+            default : {
+                throw new IndexOutOfBoundsException(String.valueOf(index));
+            }
+            }
+        }
+
+        public int getListCount() {
+            return 3;
+        }
+
+        public IAmqpList getValue() {
+            return bean;
+        }
+
+        public Iterator<AmqpType<?, ?>> iterator() {
+            return new AmqpListIterator(bean);
+        }
+
+
+        private final void copyCheck() {
+            if(buffer != null) {;
+                throw new IllegalStateException("unwriteable");
+            }
+            if(bean != this) {;
+                copy(bean);
+            }
+        }
+
+        private final void copy(AmqpUnlink.AmqpUnlinkBean other) {
+            this.options= other.options;
+            this.handle= other.handle;
+            this.exception= other.exception;
+            bean = this;
+        }
+
+        public boolean equivalent(AmqpType<?,?> t){
+            if(this == t) {
+                return true;
+            }
+
+            if(t == null || !(t instanceof AmqpUnlink)) {
+                return false;
+            }
+
+            return equivalent((AmqpUnlink) t);
+        }
+
+        public boolean equivalent(AmqpUnlink b) {
+
+            if(b.getOptions() == null ^ getOptions() == null) {
+                return false;
+            }
+            if(b.getOptions() != null && !b.getOptions().equals(getOptions())){ 
+                return false;
+            }
+
+            if(b.getHandle() == null ^ getHandle() == null) {
+                return false;
+            }
+            if(b.getHandle() != null && !b.getHandle().equals(getHandle())){ 
+                return false;
+            }
+
+            if(b.getException() == null ^ getException() == null) {
+                return false;
+            }
+            if(b.getException() != null && !b.getException().equivalent(getException())){ 
+                return false;
+            }
+            return true;
+        }
+    }
+
+    public static class AmqpUnlinkBuffer extends AmqpList.AmqpListBuffer implements AmqpUnlink{
+
+        private AmqpUnlinkBean bean;
+
+        protected AmqpUnlinkBuffer(Encoded<IAmqpList> encoded) {
+            super(encoded);
+        }
+
+        public final void setOptions(AmqpOptions options) {
+            bean().setOptions(options);
+        }
+
+        public final AmqpOptions getOptions() {
+            return bean().getOptions();
+        }
+
+        public final void setHandle(AmqpHandle handle) {
+            bean().setHandle(handle);
+        }
+
+        public final AmqpHandle getHandle() {
+            return bean().getHandle();
+        }
+
+        public final void setException(AmqpLinkError exception) {
+            bean().setException(exception);
+        }
+
+        public final AmqpLinkError getException() {
+            return bean().getException();
+        }
+
+        public void set(int index, AmqpType<?, ?> value) {
+            bean().set(index, value);
+        }
+
+        public AmqpType<?, ?> get(int index) {
+            return bean().get(index);
+        }
+
+        public int getListCount() {
+            return bean().getListCount();
+        }
+
+        public Iterator<AmqpType<?, ?>> iterator() {
+            return bean().iterator();
+        }
+
+        public IAmqpList getValue() {
+            return bean().getValue();
+        }
+
+        public AmqpUnlink.AmqpUnlinkBuffer getBuffer(AmqpMarshaller marshaller) throws AmqpEncodingError{
+            return this;
+        }
+
+        protected AmqpUnlink bean() {
+            if(bean == null) {
+                bean = new AmqpUnlink.AmqpUnlinkBean(encoded.getValue());
+                bean.buffer = this;
+            }
+            return bean;
+        }
+
+        public final void handle(AmqpCommandHandler handler) throws Exception {
+            handler.handleUnlink(this);
+        }
+
+        public boolean equivalent(AmqpType<?, ?> t) {
+            return bean().equivalent(t);
+        }
+
+        public static AmqpUnlink.AmqpUnlinkBuffer create(Encoded<IAmqpList> encoded) {
+            if(encoded.isNull()) {
+                return null;
+            }
+            return new AmqpUnlink.AmqpUnlinkBuffer(encoded);
+        }
+
+        public static AmqpUnlink.AmqpUnlinkBuffer create(DataInput in, AmqpMarshaller marshaller) throws IOException, AmqpEncodingError {
+            return create(marshaller.unmarshalAmqpUnlink(in));
+        }
+
+        public static AmqpUnlink.AmqpUnlinkBuffer create(Buffer buffer, int offset, AmqpMarshaller marshaller) throws AmqpEncodingError {
+            return create(marshaller.decodeAmqpUnlink(buffer, offset));
+        }
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUshort.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUshort.java?rev=908857&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUshort.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUshort.java Thu Feb 11 07:04:21 2010
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * his work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.amqp.protocol.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.Integer;
+import org.apache.activemq.amqp.protocol.marshaller.AmqpEncodingError;
+import org.apache.activemq.amqp.protocol.marshaller.AmqpMarshaller;
+import org.apache.activemq.amqp.protocol.marshaller.Encoded;
+import org.apache.activemq.util.buffer.Buffer;
+
+/**
+ * Represents a integer in the range 0 to 2^16 - 1
+ */
+public interface AmqpUshort extends AmqpType<AmqpUshort.AmqpUshortBean, AmqpUshort.AmqpUshortBuffer> {
+
+
+    public Integer getValue();
+
+    public static class AmqpUshortBean implements AmqpUshort{
+
+        private AmqpUshortBuffer buffer;
+        private AmqpUshortBean bean = this;
+        private Integer value;
+
+        protected AmqpUshortBean() {
+        }
+
+        public AmqpUshortBean(Integer value) {
+            this.value = value;
+        }
+
+        public AmqpUshortBean(AmqpUshort.AmqpUshortBean other) {
+            this.bean = other;
+        }
+
+        public final AmqpUshortBean copy() {
+            return bean;
+        }
+
+        public final AmqpUshort.AmqpUshortBuffer getBuffer(AmqpMarshaller marshaller) throws AmqpEncodingError{
+            if(buffer == null) {
+                buffer = new AmqpUshortBuffer(marshaller.encode(this));
+            }
+            return buffer;
+        }
+
+        public final void marshal(DataOutput out, AmqpMarshaller marshaller) throws IOException, AmqpEncodingError{
+            getBuffer(marshaller).marshal(out, marshaller);
+        }
+
+
+        public Integer getValue() {
+            return bean.value;
+        }
+
+
+        public boolean equals(Object o){
+            if(this == o) {
+                return true;
+            }
+
+            if(o == null || !(o instanceof AmqpUshort)) {
+                return false;
+            }
+
+            return equivalent((AmqpUshort) o);
+        }
+
+        public int hashCode() {
+            if(getValue() == null) {
+                return AmqpUshort.AmqpUshortBean.class.hashCode();
+            }
+            return getValue().hashCode();
+        }
+
+        public boolean equivalent(AmqpType<?,?> t){
+            if(this == t) {
+                return true;
+            }
+
+            if(t == null || !(t instanceof AmqpUshort)) {
+                return false;
+            }
+
+            return equivalent((AmqpUshort) t);
+        }
+
+        public boolean equivalent(AmqpUshort b) {
+            if(b == null) {
+                return false;
+            }
+
+            if(b.getValue() == null ^ getValue() == null) {
+                return false;
+            }
+
+            return b.getValue() == null || b.getValue().equals(getValue());
+        }
+    }
+
+    public static class AmqpUshortBuffer implements AmqpUshort, AmqpBuffer< Integer> {
+
+        private AmqpUshortBean bean;
+        protected Encoded<Integer> encoded;
+
+        protected AmqpUshortBuffer() {
+        }
+
+        protected AmqpUshortBuffer(Encoded<Integer> encoded) {
+            this.encoded = encoded;
+        }
+
+        public final Encoded<Integer> getEncoded() throws AmqpEncodingError{
+            return encoded;
+        }
+
+        public final void marshal(DataOutput out, AmqpMarshaller marshaller) throws IOException, AmqpEncodingError{
+            encoded.marshal(out);
+        }
+
+        public Integer getValue() {
+            return bean().getValue();
+        }
+
+        public AmqpUshort.AmqpUshortBuffer getBuffer(AmqpMarshaller marshaller) throws AmqpEncodingError{
+            return this;
+        }
+
+        protected AmqpUshort bean() {
+            if(bean == null) {
+                bean = new AmqpUshort.AmqpUshortBean(encoded.getValue());
+                bean.buffer = this;
+            }
+            return bean;
+        }
+
+        public boolean equals(Object o){
+            return bean().equals(o);
+        }
+
+        public int hashCode() {
+            return bean().hashCode();
+        }
+
+        public boolean equivalent(AmqpType<?, ?> t) {
+            return bean().equivalent(t);
+        }
+
+        public static AmqpUshort.AmqpUshortBuffer create(Encoded<Integer> encoded) {
+            if(encoded.isNull()) {
+                return null;
+            }
+            return new AmqpUshort.AmqpUshortBuffer(encoded);
+        }
+
+        public static AmqpUshort.AmqpUshortBuffer create(DataInput in, AmqpMarshaller marshaller) throws IOException, AmqpEncodingError {
+            return create(marshaller.unmarshalAmqpUshort(in));
+        }
+
+        public static AmqpUshort.AmqpUshortBuffer create(Buffer buffer, int offset, AmqpMarshaller marshaller) throws AmqpEncodingError {
+            return create(marshaller.decodeAmqpUshort(buffer, offset));
+        }
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUuid.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUuid.java?rev=908857&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUuid.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpUuid.java Thu Feb 11 07:04:21 2010
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * his work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.amqp.protocol.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.UUID;
+import org.apache.activemq.amqp.protocol.marshaller.AmqpEncodingError;
+import org.apache.activemq.amqp.protocol.marshaller.AmqpMarshaller;
+import org.apache.activemq.amqp.protocol.marshaller.Encoded;
+import org.apache.activemq.util.buffer.Buffer;
+
+/**
+ * Represents a a universally unique id as defined by RFC-4122 section 4.1.2
+ */
+public interface AmqpUuid extends AmqpType<AmqpUuid.AmqpUuidBean, AmqpUuid.AmqpUuidBuffer> {
+
+
+    public UUID getValue();
+
+    public static class AmqpUuidBean implements AmqpUuid{
+
+        private AmqpUuidBuffer buffer;
+        private AmqpUuidBean bean = this;
+        private UUID value;
+
+        protected AmqpUuidBean() {
+        }
+
+        public AmqpUuidBean(UUID value) {
+            this.value = value;
+        }
+
+        public AmqpUuidBean(AmqpUuid.AmqpUuidBean other) {
+            this.bean = other;
+        }
+
+        public final AmqpUuidBean copy() {
+            return bean;
+        }
+
+        public final AmqpUuid.AmqpUuidBuffer getBuffer(AmqpMarshaller marshaller) throws AmqpEncodingError{
+            if(buffer == null) {
+                buffer = new AmqpUuidBuffer(marshaller.encode(this));
+            }
+            return buffer;
+        }
+
+        public final void marshal(DataOutput out, AmqpMarshaller marshaller) throws IOException, AmqpEncodingError{
+            getBuffer(marshaller).marshal(out, marshaller);
+        }
+
+
+        public UUID getValue() {
+            return bean.value;
+        }
+
+
+        public boolean equals(Object o){
+            if(this == o) {
+                return true;
+            }
+
+            if(o == null || !(o instanceof AmqpUuid)) {
+                return false;
+            }
+
+            return equivalent((AmqpUuid) o);
+        }
+
+        public int hashCode() {
+            if(getValue() == null) {
+                return AmqpUuid.AmqpUuidBean.class.hashCode();
+            }
+            return getValue().hashCode();
+        }
+
+        public boolean equivalent(AmqpType<?,?> t){
+            if(this == t) {
+                return true;
+            }
+
+            if(t == null || !(t instanceof AmqpUuid)) {
+                return false;
+            }
+
+            return equivalent((AmqpUuid) t);
+        }
+
+        public boolean equivalent(AmqpUuid b) {
+            if(b == null) {
+                return false;
+            }
+
+            if(b.getValue() == null ^ getValue() == null) {
+                return false;
+            }
+
+            return b.getValue() == null || b.getValue().equals(getValue());
+        }
+    }
+
+    public static class AmqpUuidBuffer implements AmqpUuid, AmqpBuffer< UUID> {
+
+        private AmqpUuidBean bean;
+        protected Encoded<UUID> encoded;
+
+        protected AmqpUuidBuffer() {
+        }
+
+        protected AmqpUuidBuffer(Encoded<UUID> encoded) {
+            this.encoded = encoded;
+        }
+
+        public final Encoded<UUID> getEncoded() throws AmqpEncodingError{
+            return encoded;
+        }
+
+        public final void marshal(DataOutput out, AmqpMarshaller marshaller) throws IOException, AmqpEncodingError{
+            encoded.marshal(out);
+        }
+
+        public UUID getValue() {
+            return bean().getValue();
+        }
+
+        public AmqpUuid.AmqpUuidBuffer getBuffer(AmqpMarshaller marshaller) throws AmqpEncodingError{
+            return this;
+        }
+
+        protected AmqpUuid bean() {
+            if(bean == null) {
+                bean = new AmqpUuid.AmqpUuidBean(encoded.getValue());
+                bean.buffer = this;
+            }
+            return bean;
+        }
+
+        public boolean equals(Object o){
+            return bean().equals(o);
+        }
+
+        public int hashCode() {
+            return bean().hashCode();
+        }
+
+        public boolean equivalent(AmqpType<?, ?> t) {
+            return bean().equivalent(t);
+        }
+
+        public static AmqpUuid.AmqpUuidBuffer create(Encoded<UUID> encoded) {
+            if(encoded.isNull()) {
+                return null;
+            }
+            return new AmqpUuid.AmqpUuidBuffer(encoded);
+        }
+
+        public static AmqpUuid.AmqpUuidBuffer create(DataInput in, AmqpMarshaller marshaller) throws IOException, AmqpEncodingError {
+            return create(marshaller.unmarshalAmqpUuid(in));
+        }
+
+        public static AmqpUuid.AmqpUuidBuffer create(Buffer buffer, int offset, AmqpMarshaller marshaller) throws AmqpEncodingError {
+            return create(marshaller.decodeAmqpUuid(buffer, offset));
+        }
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpXid.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpXid.java?rev=908857&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpXid.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/AmqpXid.java Thu Feb 11 07:04:21 2010
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * his work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.amqp.protocol.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.Long;
+import java.util.Iterator;
+import org.apache.activemq.amqp.protocol.marshaller.AmqpEncodingError;
+import org.apache.activemq.amqp.protocol.marshaller.AmqpMarshaller;
+import org.apache.activemq.amqp.protocol.marshaller.Encoded;
+import org.apache.activemq.amqp.protocol.types.IAmqpList;
+import org.apache.activemq.util.buffer.Buffer;
+
+/**
+ * Represents a dtx branch identifier
+ * <p>
+ * An xid uniquely identifies a transaction branch.
+ * </p>
+ */
+public interface AmqpXid extends AmqpList {
+
+
+
+    /**
+     * implementation specific format code
+     */
+    public void setFormat(Long format);
+
+    /**
+     * implementation specific format code
+     */
+    public void setFormat(AmqpUint format);
+
+    /**
+     * implementation specific format code
+     */
+    public Long getFormat();
+
+    /**
+     * global transaction id
+     */
+    public void setGlobalId(Buffer globalId);
+
+    /**
+     * global transaction id
+     */
+    public void setGlobalId(AmqpBinary globalId);
+
+    /**
+     * global transaction id
+     */
+    public Buffer getGlobalId();
+
+    /**
+     * branch qualifier
+     */
+    public void setBranchId(Buffer branchId);
+
+    /**
+     * branch qualifier
+     */
+    public void setBranchId(AmqpBinary branchId);
+
+    /**
+     * branch qualifier
+     */
+    public Buffer getBranchId();
+
+    public static class AmqpXidBean implements AmqpXid{
+
+        private AmqpXidBuffer buffer;
+        private AmqpXidBean bean = this;
+        private AmqpUint format;
+        private AmqpBinary globalId;
+        private AmqpBinary branchId;
+
+        public AmqpXidBean() {
+        }
+
+        public AmqpXidBean(IAmqpList value) {
+            //TODO we should defer decoding of the described type:
+            for(int i = 0; i < value.getListCount(); i++) {
+                set(i, value.get(i));
+            }
+        }
+
+        public AmqpXidBean(AmqpXid.AmqpXidBean other) {
+            this.bean = other;
+        }
+
+        public final AmqpXidBean copy() {
+            return new AmqpXid.AmqpXidBean(bean);
+        }
+
+        public final AmqpXid.AmqpXidBuffer getBuffer(AmqpMarshaller marshaller) throws AmqpEncodingError{
+            if(buffer == null) {
+                buffer = new AmqpXidBuffer(marshaller.encode(this));
+            }
+            return buffer;
+        }
+
+        public final void marshal(DataOutput out, AmqpMarshaller marshaller) throws IOException, AmqpEncodingError{
+            getBuffer(marshaller).marshal(out, marshaller);
+        }
+
+
+        public void setFormat(Long format) {
+            setFormat(new AmqpUint.AmqpUintBean(format));
+        }
+
+
+        public final void setFormat(AmqpUint format) {
+            copyCheck();
+            bean.format = format;
+        }
+
+        public final Long getFormat() {
+            return bean.format.getValue();
+        }
+
+        public void setGlobalId(Buffer globalId) {
+            setGlobalId(new AmqpBinary.AmqpBinaryBean(globalId));
+        }
+
+
+        public final void setGlobalId(AmqpBinary globalId) {
+            copyCheck();
+            bean.globalId = globalId;
+        }
+
+        public final Buffer getGlobalId() {
+            return bean.globalId.getValue();
+        }
+
+        public void setBranchId(Buffer branchId) {
+            setBranchId(new AmqpBinary.AmqpBinaryBean(branchId));
+        }
+
+
+        public final void setBranchId(AmqpBinary branchId) {
+            copyCheck();
+            bean.branchId = branchId;
+        }
+
+        public final Buffer getBranchId() {
+            return bean.branchId.getValue();
+        }
+
+        public void set(int index, AmqpType<?, ?> value) {
+            switch(index) {
+            case 0: {
+                setFormat((AmqpUint) value);
+                break;
+            }
+            case 1: {
+                setGlobalId((AmqpBinary) value);
+                break;
+            }
+            case 2: {
+                setBranchId((AmqpBinary) value);
+                break;
+            }
+            default : {
+                throw new IndexOutOfBoundsException(String.valueOf(index));
+            }
+            }
+        }
+
+        public AmqpType<?, ?> get(int index) {
+            switch(index) {
+            case 0: {
+                return bean.format;
+            }
+            case 1: {
+                return bean.globalId;
+            }
+            case 2: {
+                return bean.branchId;
+            }
+            default : {
+                throw new IndexOutOfBoundsException(String.valueOf(index));
+            }
+            }
+        }
+
+        public int getListCount() {
+            return 3;
+        }
+
+        public IAmqpList getValue() {
+            return bean;
+        }
+
+        public Iterator<AmqpType<?, ?>> iterator() {
+            return new AmqpListIterator(bean);
+        }
+
+
+        private final void copyCheck() {
+            if(buffer != null) {;
+                throw new IllegalStateException("unwriteable");
+            }
+            if(bean != this) {;
+                copy(bean);
+            }
+        }
+
+        private final void copy(AmqpXid.AmqpXidBean other) {
+            this.format= other.format;
+            this.globalId= other.globalId;
+            this.branchId= other.branchId;
+            bean = this;
+        }
+
+        public boolean equivalent(AmqpType<?,?> t){
+            if(this == t) {
+                return true;
+            }
+
+            if(t == null || !(t instanceof AmqpXid)) {
+                return false;
+            }
+
+            return equivalent((AmqpXid) t);
+        }
+
+        public boolean equivalent(AmqpXid b) {
+
+            if(b.getFormat() == null ^ getFormat() == null) {
+                return false;
+            }
+            if(b.getFormat() != null && !b.getFormat().equals(getFormat())){ 
+                return false;
+            }
+
+            if(b.getGlobalId() == null ^ getGlobalId() == null) {
+                return false;
+            }
+            if(b.getGlobalId() != null && !b.getGlobalId().equals(getGlobalId())){ 
+                return false;
+            }
+
+            if(b.getBranchId() == null ^ getBranchId() == null) {
+                return false;
+            }
+            if(b.getBranchId() != null && !b.getBranchId().equals(getBranchId())){ 
+                return false;
+            }
+            return true;
+        }
+    }
+
+    public static class AmqpXidBuffer extends AmqpList.AmqpListBuffer implements AmqpXid{
+
+        private AmqpXidBean bean;
+
+        protected AmqpXidBuffer(Encoded<IAmqpList> encoded) {
+            super(encoded);
+        }
+
+    public void setFormat(Long format) {
+            bean().setFormat(format);
+        }
+
+        public final void setFormat(AmqpUint format) {
+            bean().setFormat(format);
+        }
+
+        public final Long getFormat() {
+            return bean().getFormat();
+        }
+
+    public void setGlobalId(Buffer globalId) {
+            bean().setGlobalId(globalId);
+        }
+
+        public final void setGlobalId(AmqpBinary globalId) {
+            bean().setGlobalId(globalId);
+        }
+
+        public final Buffer getGlobalId() {
+            return bean().getGlobalId();
+        }
+
+    public void setBranchId(Buffer branchId) {
+            bean().setBranchId(branchId);
+        }
+
+        public final void setBranchId(AmqpBinary branchId) {
+            bean().setBranchId(branchId);
+        }
+
+        public final Buffer getBranchId() {
+            return bean().getBranchId();
+        }
+
+        public void set(int index, AmqpType<?, ?> value) {
+            bean().set(index, value);
+        }
+
+        public AmqpType<?, ?> get(int index) {
+            return bean().get(index);
+        }
+
+        public int getListCount() {
+            return bean().getListCount();
+        }
+
+        public Iterator<AmqpType<?, ?>> iterator() {
+            return bean().iterator();
+        }
+
+        public IAmqpList getValue() {
+            return bean().getValue();
+        }
+
+        public AmqpXid.AmqpXidBuffer getBuffer(AmqpMarshaller marshaller) throws AmqpEncodingError{
+            return this;
+        }
+
+        protected AmqpXid bean() {
+            if(bean == null) {
+                bean = new AmqpXid.AmqpXidBean(encoded.getValue());
+                bean.buffer = this;
+            }
+            return bean;
+        }
+
+        public boolean equivalent(AmqpType<?, ?> t) {
+            return bean().equivalent(t);
+        }
+
+        public static AmqpXid.AmqpXidBuffer create(Encoded<IAmqpList> encoded) {
+            if(encoded.isNull()) {
+                return null;
+            }
+            return new AmqpXid.AmqpXidBuffer(encoded);
+        }
+
+        public static AmqpXid.AmqpXidBuffer create(DataInput in, AmqpMarshaller marshaller) throws IOException, AmqpEncodingError {
+            return create(marshaller.unmarshalAmqpXid(in));
+        }
+
+        public static AmqpXid.AmqpXidBuffer create(Buffer buffer, int offset, AmqpMarshaller marshaller) throws AmqpEncodingError {
+            return create(marshaller.decodeAmqpXid(buffer, offset));
+        }
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/IAmqpList.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/IAmqpList.java?rev=908857&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/IAmqpList.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/protocol/types/IAmqpList.java Thu Feb 11 07:04:21 2010
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with his work
+ * for additional information regarding copyright ownership. The ASF licenses
+ * this file to You under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.activemq.amqp.protocol.types;
+
+import java.util.Iterator;
+
+public interface IAmqpList extends Iterable<AmqpType<?, ?>> {
+
+    public AmqpType<?, ?> get(int index);
+
+    public void set(int index, AmqpType<?, ?> value);
+
+    public int getListCount();
+
+    public static class AmqpListIterator implements Iterator<AmqpType<?, ?>> {
+        int next = 0;
+        final IAmqpList list;
+
+        public AmqpListIterator(IAmqpList list) {
+            this.list = list;
+        }
+
+        public boolean hasNext() {
+            return next < list.getListCount();
+        }
+
+        public AmqpType<?, ?> next() {
+            return list.get(next++);
+        }
+
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    }
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/state/ConnectionState.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/state/ConnectionState.java?rev=908857&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/state/ConnectionState.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/state/ConnectionState.java Thu Feb 11 07:04:21 2010
@@ -0,0 +1,11 @@
+package org.apache.activemq.amqp.state;
+
+import org.apache.activemq.amqp.protocol.types.AmqpOpen;
+
+
+public class ConnectionState {
+
+    public ConnectionState(AmqpOpen openCommand) {
+        
+    }
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/wireformat/AmqpWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/wireformat/AmqpWireFormat.java?rev=908857&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/wireformat/AmqpWireFormat.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/wireformat/AmqpWireFormat.java Thu Feb 11 07:04:21 2010
@@ -0,0 +1,100 @@
+package org.apache.activemq.amqp.wireformat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.amqp.protocol.Definitions;
+import org.apache.activemq.amqp.protocol.marshaller.AmqpEncodingError;
+import org.apache.activemq.amqp.protocol.marshaller.AmqpMarshaller;
+import org.apache.activemq.amqp.protocol.types.AmqpType;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+public class AmqpWireFormat implements WireFormat {
+
+    public static final int DEFAULT_VERSION = 1;
+    public static final String WIREFORMAT_NAME = "amqp";
+    private int version;
+    private AtomicBoolean receivingMessage = new AtomicBoolean(false);
+
+    private long maxFrameSize = Long.valueOf(Definitions.MIN_MAX_FRAME_SIZE);
+    private long heartBeatInterval;
+
+    private AmqpMarshaller marshaller;
+
+    AmqpWireFormat(int version) {
+        this.version = version;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Transport createTransportFilters(Transport transport, Map options) {
+        return transport;
+    }
+
+    public String getName() {
+        return WIREFORMAT_NAME;
+    }
+
+    public void setMaxFrameSize(long maxFrameSize) {
+        this.maxFrameSize = maxFrameSize;
+    }
+
+    public long getMaxFrameSize() {
+        return maxFrameSize;
+    }
+
+    public long getHeartBeatInterval() {
+        return heartBeatInterval;
+    }
+
+    public void setHeartBeatInterval(long heartBeatInterval) {
+        this.heartBeatInterval = heartBeatInterval;
+    }
+
+    public int getVersion() {
+        return version;
+    }
+
+    public WireFormatFactory getWireFormatFactory() {
+        return new AmqpWireFormatFactory();
+    }
+
+    public boolean inReceive() {
+        return receivingMessage.get();
+    }
+
+    public Buffer marshal(Object command) throws IOException {
+        return ((AmqpType<?, ?>) command).getBuffer(marshaller).getEncoded().getBuffer();
+    }
+
+    public void marshal(Object command, DataOutput out) throws IOException {
+        ((AmqpType<?, ?>) command).marshal(out, marshaller);
+    }
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+
+    public Object unmarshal(Buffer buffer) throws IOException {
+        try {
+            return marshaller.decodeType(buffer);
+        } catch (AmqpEncodingError error) {
+            throw new IOException(error.getMessage(), error);
+        }
+    }
+
+    public Object unmarshal(DataInput in) throws IOException {
+        receivingMessage.set(true);
+        try {
+            return marshaller.unmarshalType(in);
+        } finally {
+            receivingMessage.set(false);
+        }
+    }
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/wireformat/AmqpWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/wireformat/AmqpWireFormatFactory.java?rev=908857&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/wireformat/AmqpWireFormatFactory.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/main/java/org/apache/activemq/amqp/wireformat/AmqpWireFormatFactory.java Thu Feb 11 07:04:21 2010
@@ -0,0 +1,33 @@
+package org.apache.activemq.amqp.wireformat;
+
+import java.io.*;
+
+import org.apache.activemq.amqp.protocol.marshaller.AmqpMarshaller;
+import org.apache.activemq.amqp.protocol.types.*;
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+public class AmqpWireFormatFactory implements WireFormatFactory {
+
+    private static final Buffer MAGIC = new Buffer(new byte[] { 'A', 'M', 'Q', 'P' });
+
+    public WireFormat createWireFormat() {
+        return new AmqpWireFormat(1);
+    }
+
+    public boolean isDiscriminatable() {
+        return true;
+    }
+
+    public boolean matchesWireformatHeader(Buffer buffer) {
+        if (buffer.length >= MAGIC.length) {
+            return buffer.containsAt(MAGIC, 0);
+        }
+        return false;
+    }
+
+    public int maxWireformatHeaderLength() {
+        return 8;
+    }
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/test/java/org/apache/activemq/amqp/protocol/AmqpProtocolTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/test/java/org/apache/activemq/amqp/protocol/AmqpProtocolTest.java?rev=908857&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/test/java/org/apache/activemq/amqp/protocol/AmqpProtocolTest.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-amqp/src/test/java/org/apache/activemq/amqp/protocol/AmqpProtocolTest.java Thu Feb 11 07:04:21 2010
@@ -0,0 +1,52 @@
+package org.apache.activemq.amqp.protocol;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.activemq.amqp.protocol.marshaller.AmqpEncodingError;
+import org.apache.activemq.amqp.protocol.marshaller.AmqpMarshaller;
+import org.apache.activemq.amqp.protocol.types.AmqpFlow;
+import org.apache.activemq.amqp.protocol.types.AmqpHandle;
+import org.apache.activemq.amqp.protocol.types.AmqpOptions;
+import org.apache.activemq.amqp.protocol.types.AmqpSequenceNo;
+import org.apache.activemq.amqp.protocol.types.AmqpType;
+
+import junit.framework.TestCase;
+
+public class AmqpProtocolTest extends TestCase {
+
+    public void testSequencNumber() throws Exception {
+        AmqpSequenceNo val1 = new AmqpSequenceNo.AmqpSequenceNoBean(10L);
+        AmqpSequenceNo val2 = new AmqpSequenceNo.AmqpSequenceNoBean(10L);
+        assertTrue(val1.equivalent(val2));
+        
+    }
+    
+    public void testAmqpFlow() throws Exception {
+        AmqpFlow val = new AmqpFlow.AmqpFlowBean();
+        val.setHandle(new AmqpHandle.AmqpHandleBean(1L));
+        AmqpFlow read = marshalUnmarshal(val);
+        AmqpHandle handle = read.getHandle();
+        
+        AmqpSequenceNo seq = read.getLimit();
+        AmqpOptions options = read.getOptions();
+        
+        assertTrue(val.equivalent(read));
+        System.out.println("Value: " + read.getValue());
+    }
+
+    private <T extends AmqpType<?, ?>> T marshalUnmarshal(T type) throws IOException, AmqpEncodingError {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream out = new DataOutputStream(bos);
+
+        AmqpMarshaller marshaller = org.apache.activemq.amqp.protocol.marshaller.v1_0_0.AmqpMarshaller.getMarshaller();
+        type.marshal(out, marshaller);
+        out.flush();
+
+        DataInputStream in = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+        return (T) marshaller.unmarshalType(in);
+    }
+}



Mime
View raw message