qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1526202 [3/3] - in /qpid/trunk/qpid/java/amqp-1-0-client: example/src/main/java/org/apache/qpid/amqp_1_0/client/ src/main/java/org/apache/qpid/amqp_1_0/client/
Date Wed, 25 Sep 2013 15:15:18 GMT
Modified: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java?rev=1526202&r1=1526201&r2=1526202&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java Wed Sep 25 15:15:18 2013
@@ -1,452 +1,452 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
-import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
-import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.Source;
-import org.apache.qpid.amqp_1_0.type.Target;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-public class Sender implements DeliveryStateHandler
-{
-    private SendingLinkEndpoint _endpoint;
-    private int _id;
-    private Session _session;
-    private int _windowSize;
-    private Map<Binary, OutcomeAction> _outcomeActions = Collections.synchronizedMap(new HashMap<Binary, OutcomeAction>());
-    private boolean _closed;
-    private Error _error;
-    private Runnable _remoteErrorTask;
-
-    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr)
-            throws SenderCreationException, ConnectionClosedException
-    {
-        this(session, linkName, targetAddr, sourceAddr, false);
-    }
-
-    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
-                  boolean synchronous)
-            throws SenderCreationException, ConnectionClosedException
-    {
-        this(session, linkName, targetAddr, sourceAddr, synchronous ? 1 : 0);
-    }
-
-    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
-                  int window) throws SenderCreationException, ConnectionClosedException
-    {
-        this(session, linkName, targetAddr, sourceAddr, window, AcknowledgeMode.ALO);
-    }
-
-
-    public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
-                  int window) throws SenderCreationException, ConnectionClosedException
-    {
-        this(session, linkName, target, source, window, AcknowledgeMode.ALO);
-    }
-
-    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
-                  int window, AcknowledgeMode mode)
-            throws SenderCreationException, ConnectionClosedException
-    {
-        this(session, linkName, targetAddr, sourceAddr, window, mode, null);
-    }
-
-    public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
-                  int window, AcknowledgeMode mode)
-            throws SenderCreationException, ConnectionClosedException
-    {
-        this(session, linkName, target, source, window, mode, null);
-    }
-
-    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
-                  int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
-            throws SenderCreationException, ConnectionClosedException
-    {
-        this(session, linkName, targetAddr, sourceAddr, window, mode, false, unsettled);
-    }
-
-    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
-                  int window, AcknowledgeMode mode, boolean isDurable, Map<Binary, Outcome> unsettled)
-            throws SenderCreationException, ConnectionClosedException
-    {
-        this(session, linkName, createTarget(targetAddr, isDurable), createSource(sourceAddr), window, mode, unsettled);
-    }
-
-    private static org.apache.qpid.amqp_1_0.type.messaging.Source createSource(final String sourceAddr)
-    {
-        org.apache.qpid.amqp_1_0.type.messaging.Source source = new org.apache.qpid.amqp_1_0.type.messaging.Source();
-        source.setAddress(sourceAddr);
-        return source;
-    }
-
-    private static org.apache.qpid.amqp_1_0.type.messaging.Target createTarget(final String targetAddr, final boolean isDurable)
-    {
-        org.apache.qpid.amqp_1_0.type.messaging.Target target = new org.apache.qpid.amqp_1_0.type.messaging.Target();
-        target.setAddress(targetAddr);
-        if(isDurable)
-        {
-            target.setDurable(TerminusDurability.UNSETTLED_STATE);
-            target.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
-        }
-        return target;
-    }
-
-    public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
-                  int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
-            throws SenderCreationException, ConnectionClosedException
-    {
-
-        _session = session;
-        session.getConnection().checkNotClosed();
-        _endpoint = session.getEndpoint().createSendingLinkEndpoint(linkName,
-                                                                    source, target, unsettled);
-
-
-        switch(mode)
-        {
-            case ALO:
-                _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
-                _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
-                break;
-            case AMO:
-                _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED);
-                break;
-            case EO:
-                _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
-                _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
-                break;
-
-        }
-        _endpoint.setDeliveryStateHandler(this);
-        _endpoint.attach();
-        _windowSize = window;
-
-        synchronized(_endpoint.getLock())
-        {
-            while(!(_endpoint.isAttached() || _endpoint.isDetached()))
-            {
-                try
-                {
-                    _endpoint.getLock().wait();
-                }
-                catch (InterruptedException e)
-                {
-                    throw new SenderCreationException(e);
-                }
-            }
-            if(_endpoint.getTarget()== null)
-            {
-                throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress());
-            };
-        }
-
-        _endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener()
-        {
-
-            @Override
-            public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
-            {
-                _error = detach.getError();
-                if(_error != null)
-                {
-                    remoteError();
-                }
-                super.remoteDetached(endpoint, detach);
-            }
-        });
-    }
-
-    public Source getSource()
-    {
-        return _endpoint.getSource();
-    }
-
-    public Target getTarget()
-    {
-        return _endpoint.getTarget();
-    }
-
-    public void send(Message message) throws LinkDetachedException
-    {
-        send(message, null, null);
-    }
-
-    public void send(Message message, final OutcomeAction action) throws LinkDetachedException
-    {
-        send(message, null, action);
-    }
-
-    public void send(Message message, final Transaction txn) throws LinkDetachedException
-    {
-        send(message, txn, null);
-    }
-
-    public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException
-    {
-
-        List<Section> sections = message.getPayload();
-
-        Transfer xfr = new Transfer();
-
-        if(sections != null && !sections.isEmpty())
-        {
-            SectionEncoder encoder = _session.getSectionEncoder();
-            encoder.reset();
-
-            int sectionNumber = 0;
-            for(Section section : sections)
-            {
-                encoder.encodeObject(section);
-            }
-
-
-            Binary encoding = encoder.getEncoding();
-            ByteBuffer payload = encoding.asByteBuffer();
-            xfr.setPayload(payload);
-        }
-        if(message.getDeliveryTag() == null)
-        {
-            message.setDeliveryTag(new Binary(String.valueOf(_id++).getBytes()));
-        }
-        if(message.isResume())
-        {
-            xfr.setResume(Boolean.TRUE);
-        }
-        if(message.getDeliveryState() != null)
-        {
-            xfr.setState(message.getDeliveryState());
-        }
-
-        xfr.setDeliveryTag(message.getDeliveryTag());
-        //xfr.setSettled(_windowSize ==0);
-        if(txn != null)
-        {
-            xfr.setSettled(false);
-            TransactionalState deliveryState = new TransactionalState();
-            deliveryState.setTxnId(txn.getTxnId());
-            xfr.setState(deliveryState);
-        }
-        else
-        {
-            xfr.setSettled(message.getSettled() || _endpoint.getSendingSettlementMode() == SenderSettleMode.SETTLED);
-        }
-        final Object lock = _endpoint.getLock();
-        synchronized(lock)
-        {
-            while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached())
-            {
-                try
-                {
-                    lock.wait();
-                }
-                catch (InterruptedException e)
-                {
-                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                }
-            }
-            if(_endpoint.isDetached())
-            {
-                throw new LinkDetachedException(_error);
-            }
-            if(action != null)
-            {
-                _outcomeActions.put(message.getDeliveryTag(), action);
-            }
-            _endpoint.transfer(xfr);
-            //TODO - rationalise sending of flows
-            // _endpoint.sendFlow();
-        }
-
-        if(_windowSize != 0)
-        {
-            synchronized(lock)
-            {
-
-
-                while(_endpoint.getUnsettledCount() >= _windowSize)
-                {
-                    try
-                    {
-                        lock.wait();
-                    }
-                    catch (InterruptedException e)
-                    {
-                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                    }
-                }
-            }
-
-        }
-
-
-    }
-
-    public void close() throws SenderClosingException
-    {
-
-        if(_windowSize != 0)
-        {
-            synchronized(_endpoint.getLock())
-            {
-
-
-                while(_endpoint.getUnsettledCount() > 0)
-                {
-                    try
-                    {
-                        _endpoint.getLock().wait();
-                    }
-                    catch (InterruptedException e)
-                    {
-                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                    }
-                }
-            }
-
-        }
-        _session.removeSender(this);
-        _endpoint.setSource(null);
-        _endpoint.detach();
-        _closed = true;
-
-        synchronized(_endpoint.getLock())
-        {
-            while(!_endpoint.isDetached())
-            {
-                try
-                {
-                    _endpoint.getLock().wait();
-                }
-                catch (InterruptedException e)
-                {
-                    throw new SenderClosingException(e);
-                }
-            }
-        }
-    }
-
-    public boolean isClosed()
-    {
-        return _closed;
-    }
-
-    public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
-    {
-        if(state instanceof Outcome)
-        {
-            OutcomeAction action;
-            if((action = _outcomeActions.remove(deliveryTag)) != null)
-            {
-                action.onOutcome(deliveryTag, (Outcome) state);
-            }
-            if(!Boolean.TRUE.equals(settled))
-            {
-                _endpoint.updateDisposition(deliveryTag, state, true);
-            }
-        }
-        else if(state instanceof TransactionalState)
-        {
-            OutcomeAction action;
-
-            if((action = _outcomeActions.remove(deliveryTag)) != null)
-            {
-                action.onOutcome(deliveryTag, ((TransactionalState) state).getOutcome());
-            }
-
-        }
-    }
-
-    public SendingLinkEndpoint getEndpoint()
-    {
-        return _endpoint;
-    }
-
-    public Map<Binary, DeliveryState> getRemoteUnsettled()
-    {
-        return _endpoint.getInitialUnsettledMap();
-    }
-
-    public Session getSession()
-    {
-        return _session;
-    }
-
-
-    private void remoteError()
-    {
-        if(_remoteErrorTask != null)
-        {
-            _remoteErrorTask.run();
-        }
-    }
-
-
-    public void setRemoteErrorListener(Runnable listener)
-    {
-        _remoteErrorTask = listener;
-    }
-
-    public Error getError()
-    {
-        return _error;
-    }
-
-    public class SenderCreationException extends Exception
-    {
-        public SenderCreationException(Throwable e)
-        {
-            super(e);
-        }
-
-        public SenderCreationException(String e)
-        {
-            super(e);
-
-        }
-    }
-
-    public class SenderClosingException extends Exception
-    {
-        public SenderClosingException(Throwable e)
-        {
-            super(e);
-        }
-    }
-
-    public static interface OutcomeAction
-    {
-        public void onOutcome(Binary deliveryTag, Outcome outcome);
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
+import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.Source;
+import org.apache.qpid.amqp_1_0.type.Target;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+
+public class Sender implements DeliveryStateHandler
+{
+    private SendingLinkEndpoint _endpoint;
+    private int _id;
+    private Session _session;
+    private int _windowSize;
+    private Map<Binary, OutcomeAction> _outcomeActions = Collections.synchronizedMap(new HashMap<Binary, OutcomeAction>());
+    private boolean _closed;
+    private Error _error;
+    private Runnable _remoteErrorTask;
+
+    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr)
+            throws SenderCreationException, ConnectionClosedException
+    {
+        this(session, linkName, targetAddr, sourceAddr, false);
+    }
+
+    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+                  boolean synchronous)
+            throws SenderCreationException, ConnectionClosedException
+    {
+        this(session, linkName, targetAddr, sourceAddr, synchronous ? 1 : 0);
+    }
+
+    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+                  int window) throws SenderCreationException, ConnectionClosedException
+    {
+        this(session, linkName, targetAddr, sourceAddr, window, AcknowledgeMode.ALO);
+    }
+
+
+    public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
+                  int window) throws SenderCreationException, ConnectionClosedException
+    {
+        this(session, linkName, target, source, window, AcknowledgeMode.ALO);
+    }
+
+    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+                  int window, AcknowledgeMode mode)
+            throws SenderCreationException, ConnectionClosedException
+    {
+        this(session, linkName, targetAddr, sourceAddr, window, mode, null);
+    }
+
+    public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
+                  int window, AcknowledgeMode mode)
+            throws SenderCreationException, ConnectionClosedException
+    {
+        this(session, linkName, target, source, window, mode, null);
+    }
+
+    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+                  int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
+            throws SenderCreationException, ConnectionClosedException
+    {
+        this(session, linkName, targetAddr, sourceAddr, window, mode, false, unsettled);
+    }
+
+    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+                  int window, AcknowledgeMode mode, boolean isDurable, Map<Binary, Outcome> unsettled)
+            throws SenderCreationException, ConnectionClosedException
+    {
+        this(session, linkName, createTarget(targetAddr, isDurable), createSource(sourceAddr), window, mode, unsettled);
+    }
+
+    private static org.apache.qpid.amqp_1_0.type.messaging.Source createSource(final String sourceAddr)
+    {
+        org.apache.qpid.amqp_1_0.type.messaging.Source source = new org.apache.qpid.amqp_1_0.type.messaging.Source();
+        source.setAddress(sourceAddr);
+        return source;
+    }
+
+    private static org.apache.qpid.amqp_1_0.type.messaging.Target createTarget(final String targetAddr, final boolean isDurable)
+    {
+        org.apache.qpid.amqp_1_0.type.messaging.Target target = new org.apache.qpid.amqp_1_0.type.messaging.Target();
+        target.setAddress(targetAddr);
+        if(isDurable)
+        {
+            target.setDurable(TerminusDurability.UNSETTLED_STATE);
+            target.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+        }
+        return target;
+    }
+
+    public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
+                  int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
+            throws SenderCreationException, ConnectionClosedException
+    {
+
+        _session = session;
+        session.getConnection().checkNotClosed();
+        _endpoint = session.getEndpoint().createSendingLinkEndpoint(linkName,
+                                                                    source, target, unsettled);
+
+
+        switch(mode)
+        {
+            case ALO:
+                _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+                _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+                break;
+            case AMO:
+                _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED);
+                break;
+            case EO:
+                _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+                _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
+                break;
+
+        }
+        _endpoint.setDeliveryStateHandler(this);
+        _endpoint.attach();
+        _windowSize = window;
+
+        synchronized(_endpoint.getLock())
+        {
+            while(!(_endpoint.isAttached() || _endpoint.isDetached()))
+            {
+                try
+                {
+                    _endpoint.getLock().wait();
+                }
+                catch (InterruptedException e)
+                {
+                    throw new SenderCreationException(e);
+                }
+            }
+            if(_endpoint.getTarget()== null)
+            {
+                throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress());
+            };
+        }
+
+        _endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener()
+        {
+
+            @Override
+            public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+            {
+                _error = detach.getError();
+                if(_error != null)
+                {
+                    remoteError();
+                }
+                super.remoteDetached(endpoint, detach);
+            }
+        });
+    }
+
+    public Source getSource()
+    {
+        return _endpoint.getSource();
+    }
+
+    public Target getTarget()
+    {
+        return _endpoint.getTarget();
+    }
+
+    public void send(Message message) throws LinkDetachedException
+    {
+        send(message, null, null);
+    }
+
+    public void send(Message message, final OutcomeAction action) throws LinkDetachedException
+    {
+        send(message, null, action);
+    }
+
+    public void send(Message message, final Transaction txn) throws LinkDetachedException
+    {
+        send(message, txn, null);
+    }
+
+    public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException
+    {
+
+        List<Section> sections = message.getPayload();
+
+        Transfer xfr = new Transfer();
+
+        if(sections != null && !sections.isEmpty())
+        {
+            SectionEncoder encoder = _session.getSectionEncoder();
+            encoder.reset();
+
+            int sectionNumber = 0;
+            for(Section section : sections)
+            {
+                encoder.encodeObject(section);
+            }
+
+
+            Binary encoding = encoder.getEncoding();
+            ByteBuffer payload = encoding.asByteBuffer();
+            xfr.setPayload(payload);
+        }
+        if(message.getDeliveryTag() == null)
+        {
+            message.setDeliveryTag(new Binary(String.valueOf(_id++).getBytes()));
+        }
+        if(message.isResume())
+        {
+            xfr.setResume(Boolean.TRUE);
+        }
+        if(message.getDeliveryState() != null)
+        {
+            xfr.setState(message.getDeliveryState());
+        }
+
+        xfr.setDeliveryTag(message.getDeliveryTag());
+        //xfr.setSettled(_windowSize ==0);
+        if(txn != null)
+        {
+            xfr.setSettled(false);
+            TransactionalState deliveryState = new TransactionalState();
+            deliveryState.setTxnId(txn.getTxnId());
+            xfr.setState(deliveryState);
+        }
+        else
+        {
+            xfr.setSettled(message.getSettled() || _endpoint.getSendingSettlementMode() == SenderSettleMode.SETTLED);
+        }
+        final Object lock = _endpoint.getLock();
+        synchronized(lock)
+        {
+            while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached())
+            {
+                try
+                {
+                    lock.wait();
+                }
+                catch (InterruptedException e)
+                {
+                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                }
+            }
+            if(_endpoint.isDetached())
+            {
+                throw new LinkDetachedException(_error);
+            }
+            if(action != null)
+            {
+                _outcomeActions.put(message.getDeliveryTag(), action);
+            }
+            _endpoint.transfer(xfr);
+            //TODO - rationalise sending of flows
+            // _endpoint.sendFlow();
+        }
+
+        if(_windowSize != 0)
+        {
+            synchronized(lock)
+            {
+
+
+                while(_endpoint.getUnsettledCount() >= _windowSize)
+                {
+                    try
+                    {
+                        lock.wait();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                    }
+                }
+            }
+
+        }
+
+
+    }
+
+    public void close() throws SenderClosingException
+    {
+
+        if(_windowSize != 0)
+        {
+            synchronized(_endpoint.getLock())
+            {
+
+
+                while(_endpoint.getUnsettledCount() > 0)
+                {
+                    try
+                    {
+                        _endpoint.getLock().wait();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                    }
+                }
+            }
+
+        }
+        _session.removeSender(this);
+        _endpoint.setSource(null);
+        _endpoint.detach();
+        _closed = true;
+
+        synchronized(_endpoint.getLock())
+        {
+            while(!_endpoint.isDetached())
+            {
+                try
+                {
+                    _endpoint.getLock().wait();
+                }
+                catch (InterruptedException e)
+                {
+                    throw new SenderClosingException(e);
+                }
+            }
+        }
+    }
+
+    public boolean isClosed()
+    {
+        return _closed;
+    }
+
+    public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
+    {
+        if(state instanceof Outcome)
+        {
+            OutcomeAction action;
+            if((action = _outcomeActions.remove(deliveryTag)) != null)
+            {
+                action.onOutcome(deliveryTag, (Outcome) state);
+            }
+            if(!Boolean.TRUE.equals(settled))
+            {
+                _endpoint.updateDisposition(deliveryTag, state, true);
+            }
+        }
+        else if(state instanceof TransactionalState)
+        {
+            OutcomeAction action;
+
+            if((action = _outcomeActions.remove(deliveryTag)) != null)
+            {
+                action.onOutcome(deliveryTag, ((TransactionalState) state).getOutcome());
+            }
+
+        }
+    }
+
+    public SendingLinkEndpoint getEndpoint()
+    {
+        return _endpoint;
+    }
+
+    public Map<Binary, DeliveryState> getRemoteUnsettled()
+    {
+        return _endpoint.getInitialUnsettledMap();
+    }
+
+    public Session getSession()
+    {
+        return _session;
+    }
+
+
+    private void remoteError()
+    {
+        if(_remoteErrorTask != null)
+        {
+            _remoteErrorTask.run();
+        }
+    }
+
+
+    public void setRemoteErrorListener(Runnable listener)
+    {
+        _remoteErrorTask = listener;
+    }
+
+    public Error getError()
+    {
+        return _error;
+    }
+
+    public class SenderCreationException extends Exception
+    {
+        public SenderCreationException(Throwable e)
+        {
+            super(e);
+        }
+
+        public SenderCreationException(String e)
+        {
+            super(e);
+
+        }
+    }
+
+    public class SenderClosingException extends Exception
+    {
+        public SenderClosingException(Throwable e)
+        {
+            super(e);
+        }
+    }
+
+    public static interface OutcomeAction
+    {
+        public void onOutcome(Binary deliveryTag, Outcome outcome);
+    }
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java?rev=1526202&r1=1526201&r2=1526202&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java Wed Sep 25 15:15:18 2013
@@ -1,384 +1,384 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
-import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SessionState;
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.messaging.Filter;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
-import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
-import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
-import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-public class Session
-{
-    private SessionEndpoint _endpoint;
-    private List<Receiver> _receivers = new ArrayList<Receiver>();
-    private List<Sender> _senders = new ArrayList<Sender>();
-    private SectionEncoder _sectionEncoder;
-    private SectionDecoder _sectionDecoder;
-    private TransactionController _sessionLocalTC;
-    private Connection _connection;
-
-    public Session(final Connection connection, String name)
-    {
-        _connection = connection;
-        _endpoint = connection.getEndpoint().createSession(name);
-        _sectionEncoder = new SectionEncoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
-        _sectionDecoder = new SectionDecoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
-    }
-
-
-    public synchronized Sender createSender(final String targetName)
-            throws Sender.SenderCreationException, ConnectionClosedException
-    {
-        return createSender(targetName, false);
-    }
-
-    public synchronized Sender createSender(final String targetName, boolean synchronous)
-            throws Sender.SenderCreationException, ConnectionClosedException
-    {
-
-        final String sourceName = UUID.randomUUID().toString();
-        return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, synchronous);
-
-    }
-
-    public synchronized Sender createSender(final String targetName, int window)
-            throws Sender.SenderCreationException, ConnectionClosedException
-    {
-         final String sourceName = UUID.randomUUID().toString();
-         return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, window);
-
-    }
-
-    public Sender createSender(String targetName, int window, AcknowledgeMode mode)
-            throws Sender.SenderCreationException, ConnectionClosedException
-    {
-
-        return createSender(targetName, window, mode, null);
-    }
-
-    public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName)
-            throws Sender.SenderCreationException, ConnectionClosedException
-    {
-        return createSender(targetName, window, mode, linkName, null);
-    }
-    public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName, Map<Binary, Outcome> unsettled)
-            throws Sender.SenderCreationException, ConnectionClosedException
-    {
-        return createSender(targetName, window, mode, linkName, false, unsettled);
-    }
-
-    public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName,
-                               boolean isDurable, Map<Binary, Outcome> unsettled)
-            throws Sender.SenderCreationException, ConnectionClosedException
-    {
-        return new Sender(this, linkName == null ? "->" + targetName + '(' + UUID.randomUUID().toString()+')': linkName,
-                          targetName, null, window, mode, isDurable, unsettled);
-
-    }
-
-
-    public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException
-    {
-        return createReceiver(sourceAddr, null, AcknowledgeMode.ALO);
-    }
-
-
-    public Receiver createReceiver(final String queue, final AcknowledgeMode mode)
-            throws ConnectionErrorException
-    {
-        return createReceiver(queue, null, mode);
-    }
-
-    public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName)
-            throws ConnectionErrorException
-    {
-        return createReceiver(queue, null, mode, linkName);
-    }
-
-    public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable)
-            throws ConnectionErrorException
-    {
-        return createReceiver(queue, null, mode, linkName, isDurable);
-    }
-
-    public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable,
-                                   Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
-            throws ConnectionErrorException
-    {
-        return createReceiver(queue, null, mode, linkName, isDurable, filters, unsettled);
-    }
-
-
-    public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName,
-                                   boolean isDurable, Map<Binary, Outcome> unsettled)
-            throws ConnectionErrorException
-    {
-        return createReceiver(queue, null, mode, linkName, isDurable, unsettled);
-    }
-
-
-    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode)
-            throws ConnectionErrorException
-    {
-        return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO);
-    }
-
-    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, String linkName)
-            throws ConnectionErrorException
-    {
-        return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO, linkName);
-    }
-
-
-    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
-                                            final AcknowledgeMode ackMode)
-            throws ConnectionErrorException
-    {
-        return createReceiver(sourceAddr, mode, ackMode, null);
-    }
-
-    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
-                                            final AcknowledgeMode ackMode, String linkName)
-            throws ConnectionErrorException
-    {
-        return createReceiver(sourceAddr,mode, ackMode, linkName, false);
-    }
-
-    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
-                                            final AcknowledgeMode ackMode, String linkName, boolean isDurable)
-            throws ConnectionErrorException
-    {
-        return createReceiver(sourceAddr, mode, ackMode, linkName, isDurable, null);
-    }
-
-    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
-                                            final AcknowledgeMode ackMode, String linkName, boolean isDurable,
-                                            Map<Binary, Outcome> unsettled)
-            throws ConnectionErrorException
-    {
-        return createReceiver(sourceAddr,mode,ackMode, linkName, isDurable, null, unsettled);
-    }
-
-    public synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
-                                            final AcknowledgeMode ackMode, String linkName, boolean isDurable,
-                                            Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
-            throws ConnectionErrorException
-    {
-
-        final Target target = new Target();
-        final Source source = new Source();
-        source.setAddress(sourceAddr);
-        source.setDistributionMode(mode);
-        source.setFilter(filters);
-
-        if(linkName == null)
-        {
-            linkName = sourceAddr + "-> (" + UUID.randomUUID().toString() + ")";
-        }
-
-        final Receiver receiver =
-                new Receiver(this, linkName,
-                        target, source, ackMode, isDurable, unsettled);
-        _receivers.add(receiver);
-
-        return receiver;
-
-    }
-
-    public synchronized Receiver createCopyingReceiver(final String sourceAddr) throws ConnectionErrorException
-    {
-        return createReceiver(sourceAddr, StdDistMode.COPY);
-    }
-
-    public synchronized Receiver createMovingReceiver(final String sourceAddr) throws ConnectionErrorException
-    {
-        return createReceiver(sourceAddr, StdDistMode.MOVE);
-    }
-
-    public Receiver createTemporaryQueueReceiver() throws AmqpErrorException, ConnectionErrorException
-    {
-        Source source = new Source();
-        source.setDynamic(true);
-
-        final Receiver receiver = new Receiver(this, "tempSender"+UUID.randomUUID().toString(), new Target(),
-                                               source, AcknowledgeMode.ALO);
-        _receivers.add(receiver);
-        return receiver;
-    }
-
-    public Sender createTemporaryQueueSender() throws Sender.SenderCreationException, ConnectionClosedException
-    {
-        Target target = new Target();
-        target.setDynamic(true);
-
-        final Sender sender;
-        sender = new Sender(this, "tempSender"+ UUID.randomUUID().toString(), target,
-                                                   new Source(), 0, AcknowledgeMode.ALO);
-        _senders.add(sender);
-        return sender;
-    }
-
-
-
-    public SessionEndpoint getEndpoint()
-    {
-        return _endpoint;
-    }
-
-    public synchronized void close()
-    {
-        try
-        {
-            for(Sender sender : new ArrayList<Sender>(_senders))
-            {
-                sender.close();
-            }
-            for(Receiver receiver : new ArrayList<Receiver>(_receivers))
-            {
-                receiver.detach();
-            }
-            if(_sessionLocalTC != null)
-            {
-                _sessionLocalTC.close();
-            }
-            _endpoint.end();
-        }
-        catch (Sender.SenderClosingException e)
-        {
-// TODO
-            e.printStackTrace();
-        }
-
-        //TODO
-
-    }
-
-    void removeSender(Sender sender)
-    {
-        _senders.remove(sender);
-    }
-
-    void removeReceiver(Receiver receiver)
-    {
-        _receivers.remove(receiver);
-    }
-
-    public SectionEncoder getSectionEncoder()
-    {
-        return _sectionEncoder;
-    }
-
-    public SectionDecoder getSectionDecoder()
-    {
-        return _sectionDecoder;
-    }
-
-
-    public Transaction createSessionLocalTransaction()
-    {
-        TransactionController localController = getSessionLocalTransactionController();
-        return localController.beginTransaction();
-    }
-
-    private TransactionController getSessionLocalTransactionController()
-    {
-        if(_sessionLocalTC == null)
-        {
-            _sessionLocalTC = createSessionLocalTransactionController();
-        }
-        return _sessionLocalTC;
-    }
-
-    private TransactionController createSessionLocalTransactionController()
-    {
-        String name = "txnControllerLink";
-        SendingLinkEndpoint tcLinkEndpoint = _endpoint.createTransactionController(name, TxnCapability.LOCAL_TXN,
-                                                                                   TxnCapability.MULTI_TXNS_PER_SSN);
-        tcLinkEndpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
-        tcLinkEndpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
-        tcLinkEndpoint.attach();
-        return new TransactionController(this, tcLinkEndpoint);
-    }
-
-
-    public Message receive()
-    {
-        while(getEndpoint().getState() == SessionState.ACTIVE)
-        {
-            synchronized (getEndpoint().getLock())
-            {
-                try
-                {
-                    for(Receiver r : _receivers)
-                    {
-                        Message m = r.receive(false);
-                        if(m != null)
-                            return m;
-                    }
-                    wait();
-                }
-                catch (InterruptedException e)
-                {
-                }
-            }
-        }
-        return null;
-    }
-
-    public Connection getConnection()
-    {
-        return _connection;
-    }
-
-    public void awaitActive()
-    {
-        synchronized(getEndpoint().getLock())
-        {
-            while(!getEndpoint().isEnded() && !getEndpoint().isActive())
-            {
-                try
-                {
-                    getEndpoint().getLock().wait();
-                }
-                catch (InterruptedException e)
-                {
-                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                }
-            }
-        }
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SessionState;
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Filter;
+import org.apache.qpid.amqp_1_0.type.messaging.Source;
+import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
+import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
+import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class Session
+{
+    private SessionEndpoint _endpoint;
+    private List<Receiver> _receivers = new ArrayList<Receiver>();
+    private List<Sender> _senders = new ArrayList<Sender>();
+    private SectionEncoder _sectionEncoder;
+    private SectionDecoder _sectionDecoder;
+    private TransactionController _sessionLocalTC;
+    private Connection _connection;
+
+    public Session(final Connection connection, String name)
+    {
+        _connection = connection;
+        _endpoint = connection.getEndpoint().createSession(name);
+        _sectionEncoder = new SectionEncoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
+        _sectionDecoder = new SectionDecoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
+    }
+
+
+    public synchronized Sender createSender(final String targetName)
+            throws Sender.SenderCreationException, ConnectionClosedException
+    {
+        return createSender(targetName, false);
+    }
+
+    public synchronized Sender createSender(final String targetName, boolean synchronous)
+            throws Sender.SenderCreationException, ConnectionClosedException
+    {
+
+        final String sourceName = UUID.randomUUID().toString();
+        return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, synchronous);
+
+    }
+
+    public synchronized Sender createSender(final String targetName, int window)
+            throws Sender.SenderCreationException, ConnectionClosedException
+    {
+         final String sourceName = UUID.randomUUID().toString();
+         return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, window);
+
+    }
+
+    public Sender createSender(String targetName, int window, AcknowledgeMode mode)
+            throws Sender.SenderCreationException, ConnectionClosedException
+    {
+
+        return createSender(targetName, window, mode, null);
+    }
+
+    public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName)
+            throws Sender.SenderCreationException, ConnectionClosedException
+    {
+        return createSender(targetName, window, mode, linkName, null);
+    }
+    public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName, Map<Binary, Outcome> unsettled)
+            throws Sender.SenderCreationException, ConnectionClosedException
+    {
+        return createSender(targetName, window, mode, linkName, false, unsettled);
+    }
+
+    public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName,
+                               boolean isDurable, Map<Binary, Outcome> unsettled)
+            throws Sender.SenderCreationException, ConnectionClosedException
+    {
+        return new Sender(this, linkName == null ? "->" + targetName + '(' + UUID.randomUUID().toString()+')': linkName,
+                          targetName, null, window, mode, isDurable, unsettled);
+
+    }
+
+
+    public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException
+    {
+        return createReceiver(sourceAddr, null, AcknowledgeMode.ALO);
+    }
+
+
+    public Receiver createReceiver(final String queue, final AcknowledgeMode mode)
+            throws ConnectionErrorException
+    {
+        return createReceiver(queue, null, mode);
+    }
+
+    public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName)
+            throws ConnectionErrorException
+    {
+        return createReceiver(queue, null, mode, linkName);
+    }
+
+    public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable)
+            throws ConnectionErrorException
+    {
+        return createReceiver(queue, null, mode, linkName, isDurable);
+    }
+
+    public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable,
+                                   Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
+            throws ConnectionErrorException
+    {
+        return createReceiver(queue, null, mode, linkName, isDurable, filters, unsettled);
+    }
+
+
+    public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName,
+                                   boolean isDurable, Map<Binary, Outcome> unsettled)
+            throws ConnectionErrorException
+    {
+        return createReceiver(queue, null, mode, linkName, isDurable, unsettled);
+    }
+
+
+    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode)
+            throws ConnectionErrorException
+    {
+        return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO);
+    }
+
+    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, String linkName)
+            throws ConnectionErrorException
+    {
+        return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO, linkName);
+    }
+
+
+    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+                                            final AcknowledgeMode ackMode)
+            throws ConnectionErrorException
+    {
+        return createReceiver(sourceAddr, mode, ackMode, null);
+    }
+
+    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+                                            final AcknowledgeMode ackMode, String linkName)
+            throws ConnectionErrorException
+    {
+        return createReceiver(sourceAddr,mode, ackMode, linkName, false);
+    }
+
+    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+                                            final AcknowledgeMode ackMode, String linkName, boolean isDurable)
+            throws ConnectionErrorException
+    {
+        return createReceiver(sourceAddr, mode, ackMode, linkName, isDurable, null);
+    }
+
+    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+                                            final AcknowledgeMode ackMode, String linkName, boolean isDurable,
+                                            Map<Binary, Outcome> unsettled)
+            throws ConnectionErrorException
+    {
+        return createReceiver(sourceAddr,mode,ackMode, linkName, isDurable, null, unsettled);
+    }
+
+    public synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+                                            final AcknowledgeMode ackMode, String linkName, boolean isDurable,
+                                            Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
+            throws ConnectionErrorException
+    {
+
+        final Target target = new Target();
+        final Source source = new Source();
+        source.setAddress(sourceAddr);
+        source.setDistributionMode(mode);
+        source.setFilter(filters);
+
+        if(linkName == null)
+        {
+            linkName = sourceAddr + "-> (" + UUID.randomUUID().toString() + ")";
+        }
+
+        final Receiver receiver =
+                new Receiver(this, linkName,
+                        target, source, ackMode, isDurable, unsettled);
+        _receivers.add(receiver);
+
+        return receiver;
+
+    }
+
+    public synchronized Receiver createCopyingReceiver(final String sourceAddr) throws ConnectionErrorException
+    {
+        return createReceiver(sourceAddr, StdDistMode.COPY);
+    }
+
+    public synchronized Receiver createMovingReceiver(final String sourceAddr) throws ConnectionErrorException
+    {
+        return createReceiver(sourceAddr, StdDistMode.MOVE);
+    }
+
+    public Receiver createTemporaryQueueReceiver() throws AmqpErrorException, ConnectionErrorException
+    {
+        Source source = new Source();
+        source.setDynamic(true);
+
+        final Receiver receiver = new Receiver(this, "tempSender"+UUID.randomUUID().toString(), new Target(),
+                                               source, AcknowledgeMode.ALO);
+        _receivers.add(receiver);
+        return receiver;
+    }
+
+    public Sender createTemporaryQueueSender() throws Sender.SenderCreationException, ConnectionClosedException
+    {
+        Target target = new Target();
+        target.setDynamic(true);
+
+        final Sender sender;
+        sender = new Sender(this, "tempSender"+ UUID.randomUUID().toString(), target,
+                                                   new Source(), 0, AcknowledgeMode.ALO);
+        _senders.add(sender);
+        return sender;
+    }
+
+
+
+    public SessionEndpoint getEndpoint()
+    {
+        return _endpoint;
+    }
+
+    public synchronized void close()
+    {
+        try
+        {
+            for(Sender sender : new ArrayList<Sender>(_senders))
+            {
+                sender.close();
+            }
+            for(Receiver receiver : new ArrayList<Receiver>(_receivers))
+            {
+                receiver.detach();
+            }
+            if(_sessionLocalTC != null)
+            {
+                _sessionLocalTC.close();
+            }
+            _endpoint.end();
+        }
+        catch (Sender.SenderClosingException e)
+        {
+// TODO
+            e.printStackTrace();
+        }
+
+        //TODO
+
+    }
+
+    void removeSender(Sender sender)
+    {
+        _senders.remove(sender);
+    }
+
+    void removeReceiver(Receiver receiver)
+    {
+        _receivers.remove(receiver);
+    }
+
+    public SectionEncoder getSectionEncoder()
+    {
+        return _sectionEncoder;
+    }
+
+    public SectionDecoder getSectionDecoder()
+    {
+        return _sectionDecoder;
+    }
+
+
+    public Transaction createSessionLocalTransaction()
+    {
+        TransactionController localController = getSessionLocalTransactionController();
+        return localController.beginTransaction();
+    }
+
+    private TransactionController getSessionLocalTransactionController()
+    {
+        if(_sessionLocalTC == null)
+        {
+            _sessionLocalTC = createSessionLocalTransactionController();
+        }
+        return _sessionLocalTC;
+    }
+
+    private TransactionController createSessionLocalTransactionController()
+    {
+        String name = "txnControllerLink";
+        SendingLinkEndpoint tcLinkEndpoint = _endpoint.createTransactionController(name, TxnCapability.LOCAL_TXN,
+                                                                                   TxnCapability.MULTI_TXNS_PER_SSN);
+        tcLinkEndpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+        tcLinkEndpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+        tcLinkEndpoint.attach();
+        return new TransactionController(this, tcLinkEndpoint);
+    }
+
+
+    public Message receive()
+    {
+        while(getEndpoint().getState() == SessionState.ACTIVE)
+        {
+            synchronized (getEndpoint().getLock())
+            {
+                try
+                {
+                    for(Receiver r : _receivers)
+                    {
+                        Message m = r.receive(false);
+                        if(m != null)
+                            return m;
+                    }
+                    wait();
+                }
+                catch (InterruptedException e)
+                {
+                }
+            }
+        }
+        return null;
+    }
+
+    public Connection getConnection()
+    {
+        return _connection;
+    }
+
+    public void awaitActive()
+    {
+        synchronized(getEndpoint().getLock())
+        {
+            while(!getEndpoint().isEnded() && !getEndpoint().isActive())
+            {
+                try
+                {
+                    getEndpoint().getLock().wait();
+                }
+                catch (InterruptedException e)
+                {
+                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                }
+            }
+        }
+    }
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message