qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1457505 [12/14] - in /qpid/trunk/qpid/java: amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/ amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/...
Date Sun, 17 Mar 2013 18:03:43 GMT
Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java?rev=1457505&r1=1457504&r2=1457505&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java Sun Mar 17 18:03:37 2013
@@ -1,214 +1,214 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.transport;
-
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.transport.Attach;
-import org.apache.qpid.amqp_1_0.type.transport.Flow;
-import org.apache.qpid.amqp_1_0.type.transport.Role;
-import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class SendingLinkEndpoint extends LinkEndpoint<SendingLinkListener>
-{
-
-    private UnsignedInteger _lastDeliveryId;
-    private Binary _lastDeliveryTag;
-    private Map<Binary, UnsignedInteger> _unsettledMap = new HashMap<Binary, UnsignedInteger>();
-    private Binary _transactionId;
-
-    public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, String name)
-    {
-        this(sessionEndpoint, name, null);
-    }
-
-    public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, String name, Map<Binary, Outcome> unsettled)
-    {
-        super(sessionEndpoint, name, unsettled);
-        init();
-    }
-
-    public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, final Attach attach)
-    {
-        super(sessionEndpoint, attach);
-        setSendingSettlementMode(attach.getSndSettleMode());
-        setReceivingSettlementMode(attach.getRcvSettleMode());
-        init();
-    }
-
-    private void init()
-    {
-        setDeliveryCount(UnsignedInteger.valueOf(0));
-        setAvailable(UnsignedInteger.valueOf(0));
-        setLinkEventListener(SendingLinkListener.DEFAULT);
-    }
-
-    @Override public Role getRole()
-    {
-        return Role.SENDER;
-    }
-
-    public boolean transfer(final Transfer xfr)
-    {
-        SessionEndpoint s = getSession();
-        int transferCount;
-        transferCount = _lastDeliveryTag == null ? 1 : 1;
-        xfr.setMessageFormat(UnsignedInteger.ZERO);
-        synchronized(getLock())
-        {
-
-            final int currentCredit = getLinkCredit().intValue() - transferCount;
-
-            if(currentCredit < 0)
-            {
-                return false;
-            }
-            else
-            {
-                setLinkCredit(UnsignedInteger.valueOf((int)currentCredit));
-            }
-
-            setDeliveryCount(UnsignedInteger.valueOf((getDeliveryCount().intValue() + transferCount)));
-
-            xfr.setHandle(getLocalHandle());
-
-            s.sendTransfer(xfr, this, !xfr.getDeliveryTag().equals(_lastDeliveryTag));
-
-            if(!Boolean.TRUE.equals(xfr.getSettled()))
-            {
-                _unsettledMap.put(xfr.getDeliveryTag(), xfr.getDeliveryId());
-            }
-        }
-
-        if(Boolean.TRUE.equals(xfr.getMore()))
-        {
-            _lastDeliveryTag = xfr.getDeliveryTag();
-        }
-        else
-        {
-            _lastDeliveryTag = null;
-        }
-
-        return true;
-    }
-
-
-    public void drained()
-    {
-        synchronized (getLock())
-        {
-            setDeliveryCount(getDeliveryCount().add(getLinkCredit()));
-            setLinkCredit(UnsignedInteger.ZERO);
-            sendFlow();
-        }
-    }
-
-    @Override
-    public void receiveFlow(final Flow flow)
-    {
-        super.receiveFlow(flow);
-        UnsignedInteger t = flow.getDeliveryCount();
-        UnsignedInteger c = flow.getLinkCredit();
-        setDrain(flow.getDrain());
-
-        Map options;
-        if((options = flow.getProperties()) != null)
-        {
-             _transactionId = (Binary) options.get(Symbol.valueOf("txn-id"));
-        }
-
-        if(t == null)
-        {
-            setLinkCredit(c);
-        }
-        else
-        {
-            UnsignedInteger limit = t.add(c);
-            if(limit.compareTo(getDeliveryCount())<=0)
-            {
-                setLinkCredit(UnsignedInteger.valueOf(0));
-            }
-            else
-            {
-                setLinkCredit(limit.subtract(getDeliveryCount()));
-            }
-        }
-        getLinkEventListener().flowStateChanged();
-
-    }
-
-    @Override
-    public void flowStateChanged()
-    {
-        getLinkEventListener().flowStateChanged();
-    }
-
-    public boolean hasCreditToSend()
-    {
-        UnsignedInteger linkCredit = getLinkCredit();
-        return linkCredit != null && (linkCredit.compareTo(UnsignedInteger.valueOf(0)) > 0)
-               && getSession().hasCreditToSend();
-    }
-
-    public void receiveDeliveryState(final Delivery unsettled,
-                                               final DeliveryState state,
-                                               final Boolean settled)
-    {
-        super.receiveDeliveryState(unsettled, state, settled);
-        if(settled)
-        {
-            _unsettledMap.remove(unsettled.getDeliveryTag());
-        }
-    }
-
-    public UnsignedInteger getLastDeliveryId()
-    {
-        return _lastDeliveryId;
-    }
-
-    public void setLastDeliveryId(final UnsignedInteger deliveryId)
-    {
-        _lastDeliveryId = deliveryId;
-    }
-
-    public void updateDisposition(final Binary deliveryTag, DeliveryState state, boolean settled)
-    {
-        synchronized(getLock())
-        {
-            UnsignedInteger deliveryId;
-            if(settled && (deliveryId = _unsettledMap.remove(deliveryTag))!=null)
-            {
-                settle(deliveryTag);
-                getSession().updateDisposition(getRole(), deliveryId, deliveryId, state, settled);
-            }
-
-        }
-    }
-
-    public Binary getTransactionId()
-    {
-        return _transactionId;
-    }
-
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.transport.Attach;
+import org.apache.qpid.amqp_1_0.type.transport.Flow;
+import org.apache.qpid.amqp_1_0.type.transport.Role;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SendingLinkEndpoint extends LinkEndpoint<SendingLinkListener>
+{
+
+    private UnsignedInteger _lastDeliveryId;
+    private Binary _lastDeliveryTag;
+    private Map<Binary, UnsignedInteger> _unsettledMap = new HashMap<Binary, UnsignedInteger>();
+    private Binary _transactionId;
+
+    public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, String name)
+    {
+        this(sessionEndpoint, name, null);
+    }
+
+    public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, String name, Map<Binary, Outcome> unsettled)
+    {
+        super(sessionEndpoint, name, unsettled);
+        init();
+    }
+
+    public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, final Attach attach)
+    {
+        super(sessionEndpoint, attach);
+        setSendingSettlementMode(attach.getSndSettleMode());
+        setReceivingSettlementMode(attach.getRcvSettleMode());
+        init();
+    }
+
+    private void init()
+    {
+        setDeliveryCount(UnsignedInteger.valueOf(0));
+        setAvailable(UnsignedInteger.valueOf(0));
+        setLinkEventListener(SendingLinkListener.DEFAULT);
+    }
+
+    @Override public Role getRole()
+    {
+        return Role.SENDER;
+    }
+
+    public boolean transfer(final Transfer xfr)
+    {
+        SessionEndpoint s = getSession();
+        int transferCount;
+        transferCount = _lastDeliveryTag == null ? 1 : 1;
+        xfr.setMessageFormat(UnsignedInteger.ZERO);
+        synchronized(getLock())
+        {
+
+            final int currentCredit = getLinkCredit().intValue() - transferCount;
+
+            if(currentCredit < 0)
+            {
+                return false;
+            }
+            else
+            {
+                setLinkCredit(UnsignedInteger.valueOf((int)currentCredit));
+            }
+
+            setDeliveryCount(UnsignedInteger.valueOf((getDeliveryCount().intValue() + transferCount)));
+
+            xfr.setHandle(getLocalHandle());
+
+            s.sendTransfer(xfr, this, !xfr.getDeliveryTag().equals(_lastDeliveryTag));
+
+            if(!Boolean.TRUE.equals(xfr.getSettled()))
+            {
+                _unsettledMap.put(xfr.getDeliveryTag(), xfr.getDeliveryId());
+            }
+        }
+
+        if(Boolean.TRUE.equals(xfr.getMore()))
+        {
+            _lastDeliveryTag = xfr.getDeliveryTag();
+        }
+        else
+        {
+            _lastDeliveryTag = null;
+        }
+
+        return true;
+    }
+
+
+    public void drained()
+    {
+        synchronized (getLock())
+        {
+            setDeliveryCount(getDeliveryCount().add(getLinkCredit()));
+            setLinkCredit(UnsignedInteger.ZERO);
+            sendFlow();
+        }
+    }
+
+    @Override
+    public void receiveFlow(final Flow flow)
+    {
+        super.receiveFlow(flow);
+        UnsignedInteger t = flow.getDeliveryCount();
+        UnsignedInteger c = flow.getLinkCredit();
+        setDrain(flow.getDrain());
+
+        Map options;
+        if((options = flow.getProperties()) != null)
+        {
+             _transactionId = (Binary) options.get(Symbol.valueOf("txn-id"));
+        }
+
+        if(t == null)
+        {
+            setLinkCredit(c);
+        }
+        else
+        {
+            UnsignedInteger limit = t.add(c);
+            if(limit.compareTo(getDeliveryCount())<=0)
+            {
+                setLinkCredit(UnsignedInteger.valueOf(0));
+            }
+            else
+            {
+                setLinkCredit(limit.subtract(getDeliveryCount()));
+            }
+        }
+        getLinkEventListener().flowStateChanged();
+
+    }
+
+    @Override
+    public void flowStateChanged()
+    {
+        getLinkEventListener().flowStateChanged();
+    }
+
+    public boolean hasCreditToSend()
+    {
+        UnsignedInteger linkCredit = getLinkCredit();
+        return linkCredit != null && (linkCredit.compareTo(UnsignedInteger.valueOf(0)) > 0)
+               && getSession().hasCreditToSend();
+    }
+
+    public void receiveDeliveryState(final Delivery unsettled,
+                                               final DeliveryState state,
+                                               final Boolean settled)
+    {
+        super.receiveDeliveryState(unsettled, state, settled);
+        if(settled)
+        {
+            _unsettledMap.remove(unsettled.getDeliveryTag());
+        }
+    }
+
+    public UnsignedInteger getLastDeliveryId()
+    {
+        return _lastDeliveryId;
+    }
+
+    public void setLastDeliveryId(final UnsignedInteger deliveryId)
+    {
+        _lastDeliveryId = deliveryId;
+    }
+
+    public void updateDisposition(final Binary deliveryTag, DeliveryState state, boolean settled)
+    {
+        synchronized(getLock())
+        {
+            UnsignedInteger deliveryId;
+            if(settled && (deliveryId = _unsettledMap.remove(deliveryTag))!=null)
+            {
+                settle(deliveryTag);
+                getSession().updateDisposition(getRole(), deliveryId, deliveryId, state, settled);
+            }
+
+        }
+    }
+
+    public Binary getTransactionId()
+    {
+        return _transactionId;
+    }
+
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingSessionHalfEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingSessionHalfEndpoint.java?rev=1457505&r1=1457504&r2=1457505&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingSessionHalfEndpoint.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingSessionHalfEndpoint.java Sun Mar 17 18:03:37 2013
@@ -1,26 +1,26 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.transport;
-
-public class SendingSessionHalfEndpoint extends SessionHalfEndpoint
-{
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+public class SendingSessionHalfEndpoint extends SessionHalfEndpoint
+{
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingSessionHalfEndpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SequenceNumber.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SequenceNumber.java?rev=1457505&r1=1457504&r2=1457505&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SequenceNumber.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SequenceNumber.java Sun Mar 17 18:03:37 2013
@@ -1,110 +1,110 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.transport;
-
-public class SequenceNumber implements Comparable<SequenceNumber>, Cloneable
-{
-    private int _seqNo;
-
-    public SequenceNumber(int seqNo)
-    {
-        _seqNo = seqNo;
-    }
-
-    public SequenceNumber incr()
-    {
-        _seqNo++;
-        return this;
-    }
-
-    public SequenceNumber decr()
-    {
-        _seqNo--;
-        return this;
-    }
-
-    public static SequenceNumber add(SequenceNumber a, int i)
-    {
-        return a.clone().add(i);
-    }
-
-    public static SequenceNumber subtract(SequenceNumber a, int i)
-    {
-        return a.clone().add(-i);
-    }
-
-    private SequenceNumber add(int i)
-    {
-        _seqNo+=i;
-        return this;
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o)
-        {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass())
-        {
-            return false;
-        }
-
-        SequenceNumber that = (SequenceNumber) o;
-
-        if (_seqNo != that._seqNo)
-        {
-            return false;
-        }
-
-        return true;
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return _seqNo;
-    }
-
-    public int compareTo(SequenceNumber o)
-    {
-        return _seqNo - o._seqNo;
-    }
-
-    @Override
-    public SequenceNumber clone()
-    {
-        return new SequenceNumber(_seqNo);
-    }
-
-    @Override
-    public String toString()
-    {
-        return "SN{" + _seqNo + '}';
-    }
-
-    public int intValue()
-    {
-        return _seqNo;
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+public class SequenceNumber implements Comparable<SequenceNumber>, Cloneable
+{
+    private int _seqNo;
+
+    public SequenceNumber(int seqNo)
+    {
+        _seqNo = seqNo;
+    }
+
+    public SequenceNumber incr()
+    {
+        _seqNo++;
+        return this;
+    }
+
+    public SequenceNumber decr()
+    {
+        _seqNo--;
+        return this;
+    }
+
+    public static SequenceNumber add(SequenceNumber a, int i)
+    {
+        return a.clone().add(i);
+    }
+
+    public static SequenceNumber subtract(SequenceNumber a, int i)
+    {
+        return a.clone().add(-i);
+    }
+
+    private SequenceNumber add(int i)
+    {
+        _seqNo+=i;
+        return this;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+
+        SequenceNumber that = (SequenceNumber) o;
+
+        if (_seqNo != that._seqNo)
+        {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return _seqNo;
+    }
+
+    public int compareTo(SequenceNumber o)
+    {
+        return _seqNo - o._seqNo;
+    }
+
+    @Override
+    public SequenceNumber clone()
+    {
+        return new SequenceNumber(_seqNo);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "SN{" + _seqNo + '}';
+    }
+
+    public int intValue()
+    {
+        return _seqNo;
+    }
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SequenceNumber.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionAttachment.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionAttachment.java?rev=1457505&r1=1457504&r2=1457505&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionAttachment.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionAttachment.java Sun Mar 17 18:03:37 2013
@@ -1,92 +1,92 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.transport;
-
-public class SessionAttachment
-{
-    private final SessionEndpoint _sessionEndpoint;
-    private final ConnectionEndpoint _connectionEndpoint;
-    private final short _channel;
-
-    public SessionAttachment(SessionEndpoint sessionEndpoint, ConnectionEndpoint connectionEndpoint, short channel)
-    {
-        _sessionEndpoint = sessionEndpoint;
-        _connectionEndpoint = connectionEndpoint;
-        _channel = channel;
-    }
-
-    public SessionEndpoint getSessionEndpoint()
-    {
-        return _sessionEndpoint;
-    }
-
-    public ConnectionEndpoint getConnectionEndpoint()
-    {
-        return _connectionEndpoint;
-    }
-
-    public short getChannel()
-    {
-        return _channel;
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o)
-        {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass())
-        {
-            return false;
-        }
-
-        SessionAttachment that = (SessionAttachment) o;
-
-        if (_channel != that._channel)
-        {
-            return false;
-        }
-        if (_connectionEndpoint != null
-            ? !_connectionEndpoint.equals(that._connectionEndpoint)
-            : that._connectionEndpoint != null)
-        {
-            return false;
-        }
-        if (_sessionEndpoint != null ? !_sessionEndpoint.equals(that._sessionEndpoint) : that._sessionEndpoint != null)
-        {
-            return false;
-        }
-
-        return true;
-    }
-
-    @Override
-    public int hashCode()
-    {
-        int result = _sessionEndpoint != null ? _sessionEndpoint.hashCode() : 0;
-        result = 31 * result + (_connectionEndpoint != null ? _connectionEndpoint.hashCode() : 0);
-        result = 31 * result + (int) _channel;
-        return result;
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+public class SessionAttachment
+{
+    private final SessionEndpoint _sessionEndpoint;
+    private final ConnectionEndpoint _connectionEndpoint;
+    private final short _channel;
+
+    public SessionAttachment(SessionEndpoint sessionEndpoint, ConnectionEndpoint connectionEndpoint, short channel)
+    {
+        _sessionEndpoint = sessionEndpoint;
+        _connectionEndpoint = connectionEndpoint;
+        _channel = channel;
+    }
+
+    public SessionEndpoint getSessionEndpoint()
+    {
+        return _sessionEndpoint;
+    }
+
+    public ConnectionEndpoint getConnectionEndpoint()
+    {
+        return _connectionEndpoint;
+    }
+
+    public short getChannel()
+    {
+        return _channel;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+
+        SessionAttachment that = (SessionAttachment) o;
+
+        if (_channel != that._channel)
+        {
+            return false;
+        }
+        if (_connectionEndpoint != null
+            ? !_connectionEndpoint.equals(that._connectionEndpoint)
+            : that._connectionEndpoint != null)
+        {
+            return false;
+        }
+        if (_sessionEndpoint != null ? !_sessionEndpoint.equals(that._sessionEndpoint) : that._sessionEndpoint != null)
+        {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result = _sessionEndpoint != null ? _sessionEndpoint.hashCode() : 0;
+        result = 31 * result + (_connectionEndpoint != null ? _connectionEndpoint.hashCode() : 0);
+        result = 31 * result + (int) _channel;
+        return result;
+    }
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionAttachment.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java?rev=1457505&r1=1457504&r2=1457505&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java Sun Mar 17 18:03:37 2013
@@ -1,797 +1,806 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.transport;
-
-import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
-import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
-import org.apache.qpid.amqp_1_0.type.transaction.*;
-import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-public class SessionEndpoint
-{
-    private SessionState _state = SessionState.INACTIVE;
-
-    private final Map<String, LinkEndpoint> _linkMap = new HashMap<String, LinkEndpoint>();
-    private final Map<LinkEndpoint, UnsignedInteger> _localLinkEndpoints = new HashMap<LinkEndpoint, UnsignedInteger>();
-    private final Map<UnsignedInteger, LinkEndpoint> _remoteLinkEndpoints = new HashMap<UnsignedInteger, LinkEndpoint>();
-
-    private long _timeout;
-
-
-    private ConnectionEndpoint _connection;
-    private long _lastAttachedTime;
-
-    private short _receivingChannel;
-    private short _sendingChannel;
-
-    private LinkedHashMap<UnsignedInteger,Delivery> _outgoingUnsettled;
-    private LinkedHashMap<UnsignedInteger,Delivery> _incomingUnsettled;
-
-    // has to be a power of two
-    private static final int DEFAULT_SESSION_BUFFER_SIZE = 1 << 11;
-    private static final int BUFFER_SIZE_MASK = DEFAULT_SESSION_BUFFER_SIZE - 1;
-
-    private SequenceNumber _nextIncomingTransferId;
-    private SequenceNumber _nextOutgoingTransferId;
-
-    private int _nextOutgoingDeliveryId;
-
-    //private SequenceNumber _incomingLWM;
-    //private SequenceNumber _outgoingLWM;
-
-    private UnsignedInteger _outgoingSessionCredit;
-
-
-
-    private UnsignedInteger _initialOutgoingId;
-
-    private SessionEventListener _sessionEventListener = SessionEventListener.DEFAULT;
-
-    private int _availableIncomingCredit;
-    private int _availableOutgoingCredit;
-    private UnsignedInteger _lastSentIncomingLimit;
-
-    public SessionEndpoint(final ConnectionEndpoint connectionEndpoint)
-    {
-        this(connectionEndpoint, UnsignedInteger.valueOf(0));
-    }
-
-    public SessionEndpoint(final ConnectionEndpoint connectionEndpoint, Begin begin)
-    {
-        this(connectionEndpoint, UnsignedInteger.valueOf(0));
-        _state = SessionState.BEGIN_RECVD;
-        _nextIncomingTransferId = new SequenceNumber(begin.getNextOutgoingId().intValue());
-    }
-
-
-    public SessionEndpoint(final ConnectionEndpoint connectionEndpoint, UnsignedInteger nextOutgoingId)
-    {
-        _connection = connectionEndpoint;
-
-        _initialOutgoingId = nextOutgoingId;
-        _nextOutgoingTransferId = new SequenceNumber(nextOutgoingId.intValue());
-
-        _outgoingUnsettled = new LinkedHashMap<UnsignedInteger,Delivery>(DEFAULT_SESSION_BUFFER_SIZE);
-        _incomingUnsettled = new LinkedHashMap<UnsignedInteger, Delivery>(DEFAULT_SESSION_BUFFER_SIZE);
-        _availableIncomingCredit = DEFAULT_SESSION_BUFFER_SIZE;
-        _availableOutgoingCredit = DEFAULT_SESSION_BUFFER_SIZE;
-    }
-
-
-    public void setReceivingChannel(final short receivingChannel)
-    {
-        _receivingChannel = receivingChannel;
-        switch(_state)
-        {
-            case INACTIVE:
-                _state = SessionState.BEGIN_RECVD;
-                break;
-            case BEGIN_SENT:
-                _state = SessionState.ACTIVE;
-                break;
-            default:
-                // TODO error
-
-        }
-    }
-
-
-    public void setSendingChannel(final short sendingChannel)
-    {
-        _sendingChannel = sendingChannel;
-        switch(_state)
-        {
-            case INACTIVE:
-                _state = SessionState.BEGIN_SENT;
-                break;
-            case BEGIN_RECVD:
-                _state = SessionState.ACTIVE;
-                break;
-            default:
-                // TODO error
-
-        }
-    }
-
-    public SessionState getState()
-    {
-        return _state;
-    }
-
-    public void end()
-    {
-        end(null);
-    }
-
-    public void end(final End end)
-    {
-        synchronized(getLock())
-        {
-            switch(_state)
-            {
-                case END_SENT:
-                    _state = SessionState.ENDED;
-                    break;
-                case ACTIVE:
-                    detachLinks();
-                    _sessionEventListener.remoteEnd(end);
-                    short sendChannel = getSendingChannel();
-                    _connection.sendEnd(sendChannel, new End());
-                    _state = end == null ? SessionState.END_SENT : SessionState.ENDED;
-                    break;
-                default:
-                    sendChannel = getSendingChannel();
-                    End reply = new End();
-                    Error error = new Error();
-                    error.setCondition(AmqpError.ILLEGAL_STATE);
-                    error.setDescription("END called on Session which has not been opened");
-                    reply.setError(error);
-                    _connection.sendEnd(sendChannel, reply);
-                    break;
-
-
-            }
-            getLock().notifyAll();
-        }
-    }
-
-    private void detachLinks()
-    {
-        Collection<UnsignedInteger> handles = new ArrayList<UnsignedInteger>(_remoteLinkEndpoints.keySet());
-        for(UnsignedInteger handle : handles)
-        {
-            detach(handle, null);
-        }
-    }
-
-    public short getSendingChannel()
-    {
-        return _sendingChannel;
-    }
-
-
-    public void receiveAttach(final Attach attach)
-    {
-        if(_state == SessionState.ACTIVE)
-        {
-            UnsignedInteger handle = attach.getHandle();
-            if(_remoteLinkEndpoints.containsKey(handle))
-            {
-                // TODO - Error - handle busy?
-            }
-            else
-            {
-                LinkEndpoint endpoint = getLinkMap().get(attach.getName());
-                if(endpoint == null)
-                {
-                    endpoint = attach.getRole() == Role.RECEIVER
-                               ? new SendingLinkEndpoint(this, attach)
-                               : new ReceivingLinkEndpoint(this, attach);
-
-                    // TODO : fix below - distinguish between local and remote owned
-                    endpoint.setSource(attach.getSource());
-                    endpoint.setTarget(attach.getTarget());
-
-
-                }
-
-                if(attach.getRole() == Role.SENDER)
-                {
-                    endpoint.setDeliveryCount(attach.getInitialDeliveryCount());
-                }
-
-                _remoteLinkEndpoints.put(handle, endpoint);
-
-                if(!_localLinkEndpoints.containsKey(endpoint))
-                {
-                    UnsignedInteger localHandle = findNextAvailableHandle();
-                    endpoint.setLocalHandle(localHandle);
-                    _localLinkEndpoints.put(endpoint, localHandle);
-
-                    _sessionEventListener.remoteLinkCreation(endpoint);
-
-
-                }
-                else
-                {
-                    endpoint.receiveAttach(attach);
-                }
-            }
-        }
-    }
-
-    private void send(final FrameBody frameBody)
-    {
-        _connection.send(this.getSendingChannel(), frameBody);
-    }
-
-
-    private int send(final FrameBody frameBody, ByteBuffer payload)
-    {
-        return _connection.send(this.getSendingChannel(), frameBody, payload);
-    }
-
-    private UnsignedInteger findNextAvailableHandle()
-    {
-        int i = 0;
-        do
-        {
-            if(!_localLinkEndpoints.containsValue(UnsignedInteger.valueOf(i)))
-            {
-                return UnsignedInteger.valueOf(i);
-            }
-        } while(++i != 0);
-
-        // TODO
-        throw new RuntimeException();
-    }
-
-    public void receiveDetach(final Detach detach)
-    {
-        UnsignedInteger handle = detach.getHandle();
-        detach(handle, detach);
-    }
-
-    private void detach(UnsignedInteger handle, Detach detach)
-    {
-        if(_remoteLinkEndpoints.containsKey(handle))
-        {
-            LinkEndpoint endpoint = _remoteLinkEndpoints.remove(handle);
-
-            endpoint.remoteDetached(detach);
-
-            _localLinkEndpoints.remove(endpoint);
-
-
-        }
-        else
-        {
-            // TODO
-        }
-    }
-
-    public void receiveTransfer(final Transfer transfer)
-    {
-        synchronized(getLock())
-        {
-            _nextIncomingTransferId.incr();
-/*
-            _availableIncomingCredit--;
-*/
-
-            UnsignedInteger handle = transfer.getHandle();
-
-
-
-            LinkEndpoint endpoint = _remoteLinkEndpoints.get(handle);
-
-            if(endpoint == null)
-            {
-                //TODO - error unknown link
-                System.err.println("Unknown endpoint " + transfer);
-
-            }
-
-            UnsignedInteger deliveryId = transfer.getDeliveryId();
-            if(deliveryId == null)
-            {
-                deliveryId = ((ReceivingLinkEndpoint)endpoint).getLastDeliveryId();
-            }
-            
-            Delivery delivery = _incomingUnsettled.get(deliveryId);
-            if(delivery == null)
-            {
-                delivery = new Delivery(transfer, endpoint);
-                _incomingUnsettled.put(deliveryId,delivery);
-                if(delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted()))
-                {
-/*
-                    _availableIncomingCredit++;
-*/
-                }
-                
-                if(Boolean.TRUE.equals(transfer.getMore()))
-                {
-                    ((ReceivingLinkEndpoint)endpoint).setLastDeliveryId(transfer.getDeliveryId());
-                }
-            }
-            else
-            {
-                if(delivery.getDeliveryId().equals(deliveryId))
-                {
-                    delivery.addTransfer(transfer);
-                    if(delivery.isSettled())
-                    {
-/*
-                        _availableIncomingCredit++;
-*/
-                    }
-                    else if(Boolean.TRUE.equals(transfer.getAborted()))
-                    {
-/*
-                        _availableIncomingCredit += delivery.getTransfers().size();
-*/
-                    }
-
-                    if(!Boolean.TRUE.equals(transfer.getMore()))
-                    {
-                        ((ReceivingLinkEndpoint)endpoint).setLastDeliveryId(null);
-                    }
-                }
-                else
-                {
-                    // TODO - error
-                    System.err.println("Incorrect transfer id " + transfer);
-                }
-            }
-
-            if(endpoint != null)
-            {
-                endpoint.receiveTransfer(transfer, delivery);
-            }
-
-            if((delivery.isComplete() && delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted())))
-            {
-                _incomingUnsettled.remove(deliveryId);
-            }
-        }
-    }
-
-    public void receiveFlow(final Flow flow)
-    {
-
-        synchronized(getLock())
-        {
-            UnsignedInteger handle = flow.getHandle();
-            LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle);
-
-            final UnsignedInteger nextOutgoingId = flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
-            int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue());
-            _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue());
-
-            if(endpoint != null)
-            {
-                endpoint.receiveFlow( flow );
-            }
-            else
-            {
-                for(LinkEndpoint le : _remoteLinkEndpoints.values())
-                {
-                    le.flowStateChanged();
-                }
-            }
-
-            getLock().notifyAll();
-        }
-
-
-    }
-
-    public void receiveDisposition(final Disposition disposition)
-    {
-        Role dispositionRole = disposition.getRole();
-
-        LinkedHashMap<UnsignedInteger, Delivery> unsettledTransfers;
-
-        if(dispositionRole == Role.RECEIVER)
-        {
-            unsettledTransfers = _outgoingUnsettled;
-        }
-        else
-        {
-            unsettledTransfers = _incomingUnsettled;
-
-        }
-
-        UnsignedInteger deliveryId = disposition.getFirst();
-        UnsignedInteger last = disposition.getLast();
-        if(last == null)
-        {
-            last = deliveryId;
-        }
-
-
-                while(deliveryId.compareTo(last)<=0)
-                {
-
-                    Delivery delivery = unsettledTransfers.get(deliveryId);
-                    if(delivery != null)
-                    {
-                        delivery.getLinkEndpoint().receiveDeliveryState(delivery,
-                                                                   disposition.getState(),
-                                                                   disposition.getSettled());
-                    }
-                    deliveryId = deliveryId.add(UnsignedInteger.ONE);
-                }
-                if(disposition.getSettled())
-                {
-                    checkSendFlow();
-                }
-
-    }
-
-    private void checkSendFlow()
-    {
-        //TODO
-    }
-
-    public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final String targetAddr, final String sourceAddr)
-    {
-        return createSendingLinkEndpoint(name, targetAddr, sourceAddr, null);
-    }
-
-    public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final String targetAddr, final String sourceAddr, Map<Binary, Outcome> unsettled)
-    {
-        return createSendingLinkEndpoint(name, targetAddr, sourceAddr, false, unsettled);
-    }
-
-    public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final String targetAddr,
-                                                         final String sourceAddr, boolean durable,
-                                                         Map<Binary, Outcome> unsettled)
-    {
-
-        Source source = new Source();
-        source.setAddress(sourceAddr);
-        Target target = new Target();
-        target.setAddress(targetAddr);
-        if(durable)
-        {
-            target.setDurable(TerminusDurability.UNSETTLED_STATE);
-            target.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
-        }
-
-        return createSendingLinkEndpoint(name, source, target, unsettled);
-
-    }
-
-    public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final Source source, final org.apache.qpid.amqp_1_0.type.Target target)
-    {
-        return createSendingLinkEndpoint(name, source, target, null);
-    }
-
-    public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final Source source, final org.apache.qpid.amqp_1_0.type.Target target, Map<Binary, Outcome> unsettled)
-    {
-        SendingLinkEndpoint endpoint = new SendingLinkEndpoint(this, name, unsettled);
-        endpoint.setSource(source);
-        endpoint.setTarget(target);
-        UnsignedInteger handle = findNextAvailableHandle();
-        _localLinkEndpoints.put(endpoint, handle);
-        endpoint.setLocalHandle(handle);
-        getLinkMap().put(name, endpoint);
-
-        return endpoint;
-    }
-
-    public void sendAttach(Attach attach)
-    {
-        send(attach);
-    }
-
-    public void sendTransfer(final Transfer xfr, SendingLinkEndpoint endpoint, boolean newDelivery)
-    {
-        _nextOutgoingTransferId.incr();
-        UnsignedInteger deliveryId;
-        if(newDelivery)
-        {
-            deliveryId = UnsignedInteger.valueOf(_nextOutgoingDeliveryId++);
-            endpoint.setLastDeliveryId(deliveryId);
-        }
-        else
-        {
-            deliveryId = endpoint.getLastDeliveryId();
-        }
-        xfr.setDeliveryId(deliveryId);
-
-        if(!Boolean.TRUE.equals(xfr.getSettled()))
-        {
-            Delivery delivery;
-            if((delivery = _outgoingUnsettled.get(deliveryId))== null)
-            {
-                delivery = new Delivery(xfr, endpoint);
-                _outgoingUnsettled.put(deliveryId, delivery);
-
-            }
-            else
-            {
-                delivery.addTransfer(xfr);
-            }
-            _outgoingSessionCredit = _outgoingSessionCredit.subtract(UnsignedInteger.ONE);
-            endpoint.addUnsettled(delivery);
-
-        }
-
-        try
-        {
-            ByteBuffer payload = xfr.getPayload();
-            int payloadSent = send(xfr, payload);
-
-            if(payload != null && payloadSent < payload.remaining())
-            {
-                payload = payload.duplicate();
-try
-{
-                payload.position(payload.position()+payloadSent);
-}
-catch(IllegalArgumentException e)
-{
-    System.err.println("UNEXPECTED");
-    System.err.println("Payload Position: " + payload.position());
-    System.err.println("Payload Sent: " + payloadSent);
-    System.err.println("Payload Remaining: " + payload.remaining());
-    throw e;
-
-}
-
-                Transfer secondTransfer = new Transfer();
-
-                secondTransfer.setDeliveryTag(xfr.getDeliveryTag());
-                secondTransfer.setHandle(xfr.getHandle());
-                secondTransfer.setSettled(xfr.getSettled());
-                secondTransfer.setState(xfr.getState());
-                secondTransfer.setMessageFormat(xfr.getMessageFormat());
-                secondTransfer.setPayload(payload);
-
-                sendTransfer(secondTransfer, endpoint, false);
-
-            }
-        }
-        catch(OversizeFrameException e)
-        {
-            e.printStackTrace();
-        }
-
-    }
-
-    public Object getLock()
-    {
-        return _connection.getLock();
-    }
-
-    public ReceivingLinkEndpoint createReceivingLinkEndpoint(final String name,
-                                                             String targetAddr,
-                                                             String sourceAddr,
-                                                             UnsignedInteger initialCredit,
-                                                             final DistributionMode distributionMode)
-    {
-        Source source = new Source();
-        source.setAddress(sourceAddr);
-        source.setDistributionMode(distributionMode);
-        Target target = new Target();
-        target.setAddress(targetAddr);
-
-        return createReceivingLinkEndpoint(name, target, source, initialCredit);
-    }
-
-    public ReceivingLinkEndpoint createReceivingLinkEndpoint(final String name,
-                                                             Target target,
-                                                             Source source,
-                                                             UnsignedInteger initialCredit)
-    {
-        ReceivingLinkEndpoint endpoint = new ReceivingLinkEndpoint(this, name);
-        endpoint.setLinkCredit(initialCredit);
-        endpoint.setSource(source);
-        endpoint.setTarget(target);
-        UnsignedInteger handle = findNextAvailableHandle();
-        _localLinkEndpoints.put(endpoint, handle);
-        endpoint.setLocalHandle(handle);
-        getLinkMap().put(name, endpoint);
-
-        return endpoint;
-
-    }
-
-    public void updateDisposition(final Role role,
-                                  final UnsignedInteger first,
-                                  final UnsignedInteger last,
-                                  final DeliveryState state,
-                                  final boolean settled)
-    {
-
-
-        Disposition disposition = new Disposition();
-        disposition.setRole(role);
-        disposition.setFirst(first);
-        disposition.setLast(last);
-        disposition.setSettled(settled);
-
-        disposition.setState(state);
-
-
-        if(settled)
-        {
-            if(role == Role.RECEIVER)
-            {
-                SequenceNumber pos = new SequenceNumber(first.intValue());
-                SequenceNumber end = new SequenceNumber(last.intValue());
-                while(pos.compareTo(end)<=0)
-                {
-                    Delivery d = _incomingUnsettled.remove(new UnsignedInteger(pos.intValue()));
-
-/*
-                    _availableIncomingCredit += d.getTransfers().size();
-*/
-
-                    pos.incr();
-                }
-            }
-        }
-
-        send(disposition);
-        checkSendFlow();
-    }
-
-    public void settle(Role role, final UnsignedInteger deliveryId)
-    {
-        if(role == Role.RECEIVER)
-        {
-            Delivery d = _incomingUnsettled.remove(deliveryId);
-            if(d != null)
-            {
-/*
-                _availableIncomingCredit += d.getTransfers().size();
-*/
-            }
-        }
-        else
-        {
-            Delivery d = _outgoingUnsettled.remove(deliveryId);
-/*            if(d != null)
-            {
-                _availableOutgoingCredit += d.getTransfers().size();
-
-            }*/
-        }
-
-    }
-
-    public void sendFlow()
-    {
-        sendFlow(new Flow());
-    }
-    public void sendFlow(final Flow flow)
-    {
-        final int nextIncomingId = _nextIncomingTransferId.intValue();
-        flow.setNextIncomingId(UnsignedInteger.valueOf(nextIncomingId));
-        flow.setIncomingWindow(UnsignedInteger.valueOf(_availableIncomingCredit));
-        _lastSentIncomingLimit = UnsignedInteger.valueOf(nextIncomingId + _availableIncomingCredit);
-
-        flow.setNextOutgoingId(UnsignedInteger.valueOf(_nextOutgoingTransferId.intValue()));
-        flow.setOutgoingWindow(UnsignedInteger.valueOf(_availableOutgoingCredit));
-        send(flow);
-    }
-
-    public void sendFlowConditional()
-    {
-        UnsignedInteger clientsCredit = _lastSentIncomingLimit.subtract(UnsignedInteger.valueOf(_nextIncomingTransferId.intValue()));
-        int i = UnsignedInteger.valueOf(_availableIncomingCredit).subtract(clientsCredit).compareTo(clientsCredit);
-        if(i >=0)
-        {
-            sendFlow();
-        }
-
-    }
-
-    public void sendDetach(Detach detach)
-    {
-        send(detach);
-
-    }
-
-    void doEnd(End end)
-    {
-    }
-
-    public void setNextIncomingId(final UnsignedInteger nextIncomingId)
-    {
-        _nextIncomingTransferId = new SequenceNumber(nextIncomingId.intValue());
-
-    }
-
-    public void setOutgoingSessionCredit(final UnsignedInteger outgoingSessionCredit)
-    {
-        _outgoingSessionCredit = outgoingSessionCredit;
-    }
-
-    public UnsignedInteger getNextOutgoingId()
-    {
-        return UnsignedInteger.valueOf(_nextOutgoingTransferId.intValue());
-    }
-
-    public UnsignedInteger getOutgoingWindowSize()
-    {
-        return UnsignedInteger.valueOf(_availableOutgoingCredit);
-    }
-
-    public boolean hasCreditToSend()
-    {
-        boolean b = _outgoingSessionCredit != null && _outgoingSessionCredit.intValue() > 0;
-        boolean b1 = getOutgoingWindowSize() != null && getOutgoingWindowSize().compareTo(UnsignedInteger.ZERO) > 0;
-        return b && b1;
-    }
-
-    public UnsignedInteger getIncomingWindowSize()
-    {
-        return UnsignedInteger.valueOf(_availableIncomingCredit);
-    }
-
-    public SessionEventListener getSessionEventListener()
-    {
-        return _sessionEventListener;
-    }
-
-    public void setSessionEventListener(final SessionEventListener sessionEventListener)
-    {
-        _sessionEventListener = sessionEventListener;
-    }
-
-    public ConnectionEndpoint getConnection()
-    {
-        return _connection;
-    }
-
-    public SendingLinkEndpoint createTransactionController(String name, TxnCapability... capabilities)
-    {
-        Coordinator coordinator = new Coordinator();
-        coordinator.setCapabilities(capabilities);
-
-        Source src = new Source();
-
-        return createSendingLinkEndpoint(name, src, coordinator);
-    }
-
-    Map<String, LinkEndpoint> getLinkMap()
-    {
-        return _linkMap;
-    }
-
-
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Source;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.amqp_1_0.type.transaction.*;
+import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class SessionEndpoint
+{
+    private SessionState _state = SessionState.INACTIVE;
+
+    private final Map<String, LinkEndpoint> _linkMap = new HashMap<String, LinkEndpoint>();
+    private final Map<LinkEndpoint, UnsignedInteger> _localLinkEndpoints = new HashMap<LinkEndpoint, UnsignedInteger>();
+    private final Map<UnsignedInteger, LinkEndpoint> _remoteLinkEndpoints = new HashMap<UnsignedInteger, LinkEndpoint>();
+
+    private long _timeout;
+
+
+    private ConnectionEndpoint _connection;
+    private long _lastAttachedTime;
+
+    private short _receivingChannel;
+    private short _sendingChannel;
+
+    private LinkedHashMap<UnsignedInteger,Delivery> _outgoingUnsettled;
+    private LinkedHashMap<UnsignedInteger,Delivery> _incomingUnsettled;
+
+    // has to be a power of two
+    private static final int DEFAULT_SESSION_BUFFER_SIZE = 1 << 11;
+    private static final int BUFFER_SIZE_MASK = DEFAULT_SESSION_BUFFER_SIZE - 1;
+
+    private SequenceNumber _nextIncomingTransferId;
+    private SequenceNumber _nextOutgoingTransferId;
+
+    private int _nextOutgoingDeliveryId;
+
+    //private SequenceNumber _incomingLWM;
+    //private SequenceNumber _outgoingLWM;
+
+    private UnsignedInteger _outgoingSessionCredit;
+
+
+
+    private UnsignedInteger _initialOutgoingId;
+
+    private SessionEventListener _sessionEventListener = SessionEventListener.DEFAULT;
+
+    private int _availableIncomingCredit;
+    private int _availableOutgoingCredit;
+    private UnsignedInteger _lastSentIncomingLimit;
+
+    public SessionEndpoint(final ConnectionEndpoint connectionEndpoint)
+    {
+        this(connectionEndpoint, UnsignedInteger.valueOf(0));
+    }
+
+    public SessionEndpoint(final ConnectionEndpoint connectionEndpoint, Begin begin)
+    {
+        this(connectionEndpoint, UnsignedInteger.valueOf(0));
+        _state = SessionState.BEGIN_RECVD;
+        _nextIncomingTransferId = new SequenceNumber(begin.getNextOutgoingId().intValue());
+    }
+
+
+    public SessionEndpoint(final ConnectionEndpoint connectionEndpoint, UnsignedInteger nextOutgoingId)
+    {
+        _connection = connectionEndpoint;
+
+        _initialOutgoingId = nextOutgoingId;
+        _nextOutgoingTransferId = new SequenceNumber(nextOutgoingId.intValue());
+
+        _outgoingUnsettled = new LinkedHashMap<UnsignedInteger,Delivery>(DEFAULT_SESSION_BUFFER_SIZE);
+        _incomingUnsettled = new LinkedHashMap<UnsignedInteger, Delivery>(DEFAULT_SESSION_BUFFER_SIZE);
+        _availableIncomingCredit = DEFAULT_SESSION_BUFFER_SIZE;
+        _availableOutgoingCredit = DEFAULT_SESSION_BUFFER_SIZE;
+    }
+
+
+    public void setReceivingChannel(final short receivingChannel)
+    {
+        _receivingChannel = receivingChannel;
+        switch(_state)
+        {
+            case INACTIVE:
+                _state = SessionState.BEGIN_RECVD;
+                break;
+            case BEGIN_SENT:
+                _state = SessionState.ACTIVE;
+                break;
+            default:
+                // TODO error
+
+        }
+    }
+
+
+    public void setSendingChannel(final short sendingChannel)
+    {
+        _sendingChannel = sendingChannel;
+        switch(_state)
+        {
+            case INACTIVE:
+                _state = SessionState.BEGIN_SENT;
+                break;
+            case BEGIN_RECVD:
+                _state = SessionState.ACTIVE;
+                break;
+            default:
+                // TODO error
+
+        }
+    }
+
+    public SessionState getState()
+    {
+        return _state;
+    }
+
+    public void end()
+    {
+        end(null);
+    }
+
+    public void end(final End end)
+    {
+        synchronized(getLock())
+        {
+            switch(_state)
+            {
+                case END_SENT:
+                    _state = SessionState.ENDED;
+                    break;
+                case ACTIVE:
+                    detachLinks();
+                    _sessionEventListener.remoteEnd(end);
+                    short sendChannel = getSendingChannel();
+                    _connection.sendEnd(sendChannel, new End());
+                    _state = end == null ? SessionState.END_SENT : SessionState.ENDED;
+                    break;
+                default:
+                    sendChannel = getSendingChannel();
+                    End reply = new End();
+                    Error error = new Error();
+                    error.setCondition(AmqpError.ILLEGAL_STATE);
+                    error.setDescription("END called on Session which has not been opened");
+                    reply.setError(error);
+                    _connection.sendEnd(sendChannel, reply);
+                    break;
+
+
+            }
+            getLock().notifyAll();
+        }
+    }
+
+    private void detachLinks()
+    {
+        Collection<UnsignedInteger> handles = new ArrayList<UnsignedInteger>(_remoteLinkEndpoints.keySet());
+        for(UnsignedInteger handle : handles)
+        {
+            detach(handle, null);
+        }
+    }
+
+    public short getSendingChannel()
+    {
+        return _sendingChannel;
+    }
+
+
+    public void receiveAttach(final Attach attach)
+    {
+        if(_state == SessionState.ACTIVE)
+        {
+            UnsignedInteger handle = attach.getHandle();
+            if(_remoteLinkEndpoints.containsKey(handle))
+            {
+                // TODO - Error - handle busy?
+            }
+            else
+            {
+                LinkEndpoint endpoint = getLinkMap().get(attach.getName());
+                if(endpoint == null)
+                {
+                    endpoint = attach.getRole() == Role.RECEIVER
+                               ? new SendingLinkEndpoint(this, attach)
+                               : new ReceivingLinkEndpoint(this, attach);
+
+                    // TODO : fix below - distinguish between local and remote owned
+                    endpoint.setSource(attach.getSource());
+                    endpoint.setTarget(attach.getTarget());
+
+
+                }
+
+                if(attach.getRole() == Role.SENDER)
+                {
+                    endpoint.setDeliveryCount(attach.getInitialDeliveryCount());
+                }
+
+                _remoteLinkEndpoints.put(handle, endpoint);
+
+                if(!_localLinkEndpoints.containsKey(endpoint))
+                {
+                    UnsignedInteger localHandle = findNextAvailableHandle();
+                    endpoint.setLocalHandle(localHandle);
+                    _localLinkEndpoints.put(endpoint, localHandle);
+
+                    _sessionEventListener.remoteLinkCreation(endpoint);
+
+
+                }
+                else
+                {
+                    endpoint.receiveAttach(attach);
+                }
+            }
+        }
+    }
+
+    private void send(final FrameBody frameBody)
+    {
+        _connection.send(this.getSendingChannel(), frameBody);
+    }
+
+
+    private int send(final FrameBody frameBody, ByteBuffer payload)
+    {
+        return _connection.send(this.getSendingChannel(), frameBody, payload);
+    }
+
+    private UnsignedInteger findNextAvailableHandle()
+    {
+        int i = 0;
+        do
+        {
+            if(!_localLinkEndpoints.containsValue(UnsignedInteger.valueOf(i)))
+            {
+                return UnsignedInteger.valueOf(i);
+            }
+        } while(++i != 0);
+
+        // TODO
+        throw new RuntimeException();
+    }
+
+    public void receiveDetach(final Detach detach)
+    {
+        UnsignedInteger handle = detach.getHandle();
+        detach(handle, detach);
+    }
+
+    private void detach(UnsignedInteger handle, Detach detach)
+    {
+        if(_remoteLinkEndpoints.containsKey(handle))
+        {
+            LinkEndpoint endpoint = _remoteLinkEndpoints.remove(handle);
+
+            endpoint.remoteDetached(detach);
+
+            _localLinkEndpoints.remove(endpoint);
+
+
+        }
+        else
+        {
+            // TODO
+        }
+    }
+
+    public void receiveTransfer(final Transfer transfer)
+    {
+        synchronized(getLock())
+        {
+            _nextIncomingTransferId.incr();
+/*
+            _availableIncomingCredit--;
+*/
+
+            UnsignedInteger handle = transfer.getHandle();
+
+
+
+            LinkEndpoint endpoint = _remoteLinkEndpoints.get(handle);
+
+            if(endpoint == null)
+            {
+                //TODO - error unknown link
+                System.err.println("Unknown endpoint " + transfer);
+
+            }
+
+            UnsignedInteger deliveryId = transfer.getDeliveryId();
+            if(deliveryId == null)
+            {
+                deliveryId = ((ReceivingLinkEndpoint)endpoint).getLastDeliveryId();
+            }
+
+            Delivery delivery = _incomingUnsettled.get(deliveryId);
+            if(delivery == null)
+            {
+                delivery = new Delivery(transfer, endpoint);
+                _incomingUnsettled.put(deliveryId,delivery);
+                if(delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted()))
+                {
+/*
+                    _availableIncomingCredit++;
+*/
+                }
+
+                if(Boolean.TRUE.equals(transfer.getMore()))
+                {
+                    ((ReceivingLinkEndpoint)endpoint).setLastDeliveryId(transfer.getDeliveryId());
+                }
+            }
+            else
+            {
+                if(delivery.getDeliveryId().equals(deliveryId))
+                {
+                    delivery.addTransfer(transfer);
+                    if(delivery.isSettled())
+                    {
+/*
+                        _availableIncomingCredit++;
+*/
+                    }
+                    else if(Boolean.TRUE.equals(transfer.getAborted()))
+                    {
+/*
+                        _availableIncomingCredit += delivery.getTransfers().size();
+*/
+                    }
+
+                    if(!Boolean.TRUE.equals(transfer.getMore()))
+                    {
+                        ((ReceivingLinkEndpoint)endpoint).setLastDeliveryId(null);
+                    }
+                }
+                else
+                {
+                    // TODO - error
+                    System.err.println("Incorrect transfer id " + transfer);
+                }
+            }
+
+            if(endpoint != null)
+            {
+                endpoint.receiveTransfer(transfer, delivery);
+            }
+
+            if((delivery.isComplete() && delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted())))
+            {
+                _incomingUnsettled.remove(deliveryId);
+            }
+        }
+    }
+
+    public void receiveFlow(final Flow flow)
+    {
+
+        synchronized(getLock())
+        {
+            UnsignedInteger handle = flow.getHandle();
+            LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle);
+
+            final UnsignedInteger nextOutgoingId = flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
+            int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue());
+            _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue());
+
+            if(endpoint != null)
+            {
+                endpoint.receiveFlow( flow );
+            }
+            else
+            {
+                for(LinkEndpoint le : _remoteLinkEndpoints.values())
+                {
+                    le.flowStateChanged();
+                }
+            }
+
+            getLock().notifyAll();
+        }
+
+
+    }
+
+    public void receiveDisposition(final Disposition disposition)
+    {
+        Role dispositionRole = disposition.getRole();
+
+        LinkedHashMap<UnsignedInteger, Delivery> unsettledTransfers;
+
+        if(dispositionRole == Role.RECEIVER)
+        {
+            unsettledTransfers = _outgoingUnsettled;
+        }
+        else
+        {
+            unsettledTransfers = _incomingUnsettled;
+
+        }
+
+        UnsignedInteger deliveryId = disposition.getFirst();
+        UnsignedInteger last = disposition.getLast();
+        if(last == null)
+        {
+            last = deliveryId;
+        }
+
+
+                while(deliveryId.compareTo(last)<=0)
+                {
+
+                    Delivery delivery = unsettledTransfers.get(deliveryId);
+                    if(delivery != null)
+                    {
+                        delivery.getLinkEndpoint().receiveDeliveryState(delivery,
+                                                                   disposition.getState(),
+                                                                   disposition.getSettled());
+                    }
+                    deliveryId = deliveryId.add(UnsignedInteger.ONE);
+                }
+                if(disposition.getSettled())
+                {
+                    checkSendFlow();
+                }
+
+    }
+
+    private void checkSendFlow()
+    {
+        //TODO
+    }
+
+    public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final String targetAddr, final String sourceAddr)
+    {
+        return createSendingLinkEndpoint(name, targetAddr, sourceAddr, null);
+    }
+
+    public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final String targetAddr, final String sourceAddr, Map<Binary, Outcome> unsettled)
+    {
+        return createSendingLinkEndpoint(name, targetAddr, sourceAddr, false, unsettled);
+    }
+
+    public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final String targetAddr,
+                                                         final String sourceAddr, boolean durable,
+                                                         Map<Binary, Outcome> unsettled)
+    {
+
+        Source source = new Source();
+        source.setAddress(sourceAddr);
+        Target target = new Target();
+        target.setAddress(targetAddr);
+        if(durable)
+        {
+            target.setDurable(TerminusDurability.UNSETTLED_STATE);
+            target.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+        }
+
+        return createSendingLinkEndpoint(name, source, target, unsettled);
+
+    }
+
+    public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final Source source, final org.apache.qpid.amqp_1_0.type.Target target)
+    {
+        return createSendingLinkEndpoint(name, source, target, null);
+    }
+
+    public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final Source source, final org.apache.qpid.amqp_1_0.type.Target target, Map<Binary, Outcome> unsettled)
+    {
+        SendingLinkEndpoint endpoint = new SendingLinkEndpoint(this, name, unsettled);
+        endpoint.setSource(source);
+        endpoint.setTarget(target);
+        UnsignedInteger handle = findNextAvailableHandle();
+        _localLinkEndpoints.put(endpoint, handle);
+        endpoint.setLocalHandle(handle);
+        getLinkMap().put(name, endpoint);
+
+        return endpoint;
+    }
+
+    public void sendAttach(Attach attach)
+    {
+        send(attach);
+    }
+
+    public void sendTransfer(final Transfer xfr, SendingLinkEndpoint endpoint, boolean newDelivery)
+    {
+        _nextOutgoingTransferId.incr();
+        UnsignedInteger deliveryId;
+        if(newDelivery)
+        {
+            deliveryId = UnsignedInteger.valueOf(_nextOutgoingDeliveryId++);
+            endpoint.setLastDeliveryId(deliveryId);
+        }
+        else
+        {
+            deliveryId = endpoint.getLastDeliveryId();
+        }
+        xfr.setDeliveryId(deliveryId);
+
+        if(!Boolean.TRUE.equals(xfr.getSettled()))
+        {
+            Delivery delivery;
+            if((delivery = _outgoingUnsettled.get(deliveryId))== null)
+            {
+                delivery = new Delivery(xfr, endpoint);
+                _outgoingUnsettled.put(deliveryId, delivery);
+
+            }
+            else
+            {
+                delivery.addTransfer(xfr);
+            }
+            _outgoingSessionCredit = _outgoingSessionCredit.subtract(UnsignedInteger.ONE);
+            endpoint.addUnsettled(delivery);
+
+        }
+
+        try
+        {
+            ByteBuffer payload = xfr.getPayload();
+            int payloadSent = send(xfr, payload);
+
+            if(payload != null && payloadSent < payload.remaining())
+            {
+                payload = payload.duplicate();
+try
+{
+                payload.position(payload.position()+payloadSent);
+}
+catch(IllegalArgumentException e)
+{
+    System.err.println("UNEXPECTED");
+    System.err.println("Payload Position: " + payload.position());
+    System.err.println("Payload Sent: " + payloadSent);
+    System.err.println("Payload Remaining: " + payload.remaining());
+    throw e;
+
+}
+
+                Transfer secondTransfer = new Transfer();
+
+                secondTransfer.setDeliveryTag(xfr.getDeliveryTag());
+                secondTransfer.setHandle(xfr.getHandle());
+                secondTransfer.setSettled(xfr.getSettled());
+                secondTransfer.setState(xfr.getState());
+                secondTransfer.setMessageFormat(xfr.getMessageFormat());
+                secondTransfer.setPayload(payload);
+
+                sendTransfer(secondTransfer, endpoint, false);
+
+            }
+        }
+        catch(OversizeFrameException e)
+        {
+            e.printStackTrace();
+        }
+
+    }
+
+    public Object getLock()
+    {
+        return _connection.getLock();
+    }
+
+    public ReceivingLinkEndpoint createReceivingLinkEndpoint(final String name,
+                                                             String targetAddr,
+                                                             String sourceAddr,
+                                                             UnsignedInteger initialCredit,
+                                                             final DistributionMode distributionMode)
+    {
+        Source source = new Source();
+        source.setAddress(sourceAddr);
+        source.setDistributionMode(distributionMode);
+        Target target = new Target();
+        target.setAddress(targetAddr);
+
+        return createReceivingLinkEndpoint(name, target, source, initialCredit);
+    }
+
+    public ReceivingLinkEndpoint createReceivingLinkEndpoint(final String name,
+                                                             Target target,
+                                                             Source source,
+                                                             UnsignedInteger initialCredit)
+    {
+        ReceivingLinkEndpoint endpoint = new ReceivingLinkEndpoint(this, name);
+        endpoint.setLinkCredit(initialCredit);
+        endpoint.setSource(source);
+        endpoint.setTarget(target);
+        UnsignedInteger handle = findNextAvailableHandle();
+        _localLinkEndpoints.put(endpoint, handle);
+        endpoint.setLocalHandle(handle);
+        getLinkMap().put(name, endpoint);
+
+        return endpoint;
+
+    }
+
+    public void updateDisposition(final Role role,
+                                  final UnsignedInteger first,
+                                  final UnsignedInteger last,
+                                  final DeliveryState state,
+                                  final boolean settled)
+    {
+
+
+        Disposition disposition = new Disposition();
+        disposition.setRole(role);
+        disposition.setFirst(first);
+        disposition.setLast(last);
+        disposition.setSettled(settled);
+
+        disposition.setState(state);
+
+
+        if(settled)
+        {
+            if(role == Role.RECEIVER)
+            {
+                SequenceNumber pos = new SequenceNumber(first.intValue());
+                SequenceNumber end = new SequenceNumber(last.intValue());
+                while(pos.compareTo(end)<=0)
+                {
+                    Delivery d = _incomingUnsettled.remove(new UnsignedInteger(pos.intValue()));
+
+/*
+                    _availableIncomingCredit += d.getTransfers().size();
+*/
+
+                    pos.incr();
+                }
+            }
+        }
+
+        send(disposition);
+        checkSendFlow();
+    }
+
+    public void settle(Role role, final UnsignedInteger deliveryId)
+    {
+        if(role == Role.RECEIVER)
+        {
+            Delivery d = _incomingUnsettled.remove(deliveryId);
+            if(d != null)
+            {
+/*
+                _availableIncomingCredit += d.getTransfers().size();
+*/
+            }
+        }
+        else
+        {
+            Delivery d = _outgoingUnsettled.remove(deliveryId);
+/*            if(d != null)
+            {
+                _availableOutgoingCredit += d.getTransfers().size();
+
+            }*/
+        }
+
+    }
+
+    public void sendFlow()
+    {
+        sendFlow(new Flow());
+    }
+    public void sendFlow(final Flow flow)
+    {
+        final int nextIncomingId = _nextIncomingTransferId.intValue();
+        flow.setNextIncomingId(UnsignedInteger.valueOf(nextIncomingId));
+        flow.setIncomingWindow(UnsignedInteger.valueOf(_availableIncomingCredit));
+        _lastSentIncomingLimit = UnsignedInteger.valueOf(nextIncomingId + _availableIncomingCredit);
+
+        flow.setNextOutgoingId(UnsignedInteger.valueOf(_nextOutgoingTransferId.intValue()));
+        flow.setOutgoingWindow(UnsignedInteger.valueOf(_availableOutgoingCredit));
+        send(flow);
+    }
+
+    public void sendFlowConditional()
+    {
+        UnsignedInteger clientsCredit = _lastSentIncomingLimit.subtract(UnsignedInteger.valueOf(_nextIncomingTransferId.intValue()));
+        int i = UnsignedInteger.valueOf(_availableIncomingCredit).subtract(clientsCredit).compareTo(clientsCredit);
+        if(i >=0)
+        {
+            sendFlow();
+        }
+
+    }
+
+    public void sendDetach(Detach detach)
+    {
+        send(detach);
+
+    }
+
+    void doEnd(End end)
+    {
+    }
+
+    public void setNextIncomingId(final UnsignedInteger nextIncomingId)
+    {
+        _nextIncomingTransferId = new SequenceNumber(nextIncomingId.intValue());
+
+    }
+
+    public void setOutgoingSessionCredit(final UnsignedInteger outgoingSessionCredit)
+    {
+        _outgoingSessionCredit = outgoingSessionCredit;
+    }
+
+    public UnsignedInteger getNextOutgoingId()
+    {
+        return UnsignedInteger.valueOf(_nextOutgoingTransferId.intValue());
+    }
+
+    public UnsignedInteger getOutgoingWindowSize()
+    {
+        return UnsignedInteger.valueOf(_availableOutgoingCredit);
+    }
+
+    public boolean hasCreditToSend()
+    {
+        boolean b = _outgoingSessionCredit != null && _outgoingSessionCredit.intValue() > 0;
+        boolean b1 = getOutgoingWindowSize() != null && getOutgoingWindowSize().compareTo(UnsignedInteger.ZERO) > 0;
+        return b && b1;
+    }
+
+    public UnsignedInteger getIncomingWindowSize()
+    {
+        return UnsignedInteger.valueOf(_availableIncomingCredit);
+    }
+
+    public SessionEventListener getSessionEventListener()
+    {
+        return _sessionEventListener;
+    }
+
+    public void setSessionEventListener(final SessionEventListener sessionEventListener)
+    {
+        _sessionEventListener = sessionEventListener;
+    }
+
+    public ConnectionEndpoint getConnection()
+    {
+        return _connection;
+    }
+
+    public SendingLinkEndpoint createTransactionController(String name, TxnCapability... capabilities)
+    {
+        Coordinator coordinator = new Coordinator();
+        coordinator.setCapabilities(capabilities);
+
+        Source src = new Source();
+
+        return createSendingLinkEndpoint(name, src, coordinator);
+    }
+
+    Map<String, LinkEndpoint> getLinkMap()
+    {
+        return _linkMap;
+    }
+
+
+    public boolean isEnded()
+    {
+        return _state == SessionState.ENDED || _connection.isClosed();
+    }
+
+    public boolean isActive()
+    {
+        return _state == SessionState.ACTIVE;
+    }
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEventListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionHalfEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionHalfEndpoint.java?rev=1457505&r1=1457504&r2=1457505&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionHalfEndpoint.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionHalfEndpoint.java Sun Mar 17 18:03:37 2013
@@ -1,26 +1,26 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.transport;
-
-public class SessionHalfEndpoint
-{
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+public class SessionHalfEndpoint
+{
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionHalfEndpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java?rev=1457505&r1=1457504&r2=1457505&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java Sun Mar 17 18:03:37 2013
@@ -1,34 +1,34 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.transport;
-
-public enum SessionState
-{
-    ACTIVE,
-    INACTIVE,
-    BEGIN_SENT,
-    BEGIN_RECVD,
-    END_SENT,
-    END_RECVD,
-    ENDED;
-
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+public enum SessionState
+{
+    ACTIVE,
+    INACTIVE,
+    BEGIN_SENT,
+    BEGIN_RECVD,
+    END_SENT,
+    END_RECVD,
+    ENDED;
+
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/StateChangeListener.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message