qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject svn commit: r648692 [2/3] - in /incubator/qpid/trunk/qpid/java: ./ client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/ client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/ client/example/src/main/java/org/apache...
Date Wed, 16 Apr 2008 13:32:20 GMT
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java Wed Apr 16 06:32:13 2008
@@ -41,8 +41,8 @@
 {
     public static final short TRANSFER_ACQUIRE_MODE_NO_ACQUIRE = 1;
     public static final short TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE = 0;
-    public static final short TRANSFER_CONFIRM_MODE_REQUIRED = 1;
-    public static final short TRANSFER_CONFIRM_MODE_NOT_REQUIRED = 0;
+    public static final short TRANSFER_CONFIRM_MODE_REQUIRED = 0;
+    public static final short TRANSFER_CONFIRM_MODE_NOT_REQUIRED = 1;
     public static final short MESSAGE_FLOW_MODE_CREDIT = 0;
     public static final short MESSAGE_FLOW_MODE_WINDOW = 1;
     public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0;
@@ -63,17 +63,11 @@
      */
     public void sync();
 
-    /**
-     * Close this session and any associated resources.
-     */
-    public void sessionClose();
+    public void sessionDetach(byte[] name);
 
-    /**
-     * Suspend this session resulting in interrupting the traffic with the broker.
-     * <p> The session timer will start to tick in suspend.
-     * <p> When a session is suspend any operation of this session and of the associated resources are unavailable.
-     */
-    public void sessionSuspend();
+    public void sessionRequestTimeout(long expiry);
+
+    public byte[] getName();
 
     //------------------------------------------------------
     //                 Messaging methods
@@ -177,7 +171,7 @@
      *                    is acquired when the transfer starts
      *                    </ul>
      */
-    public void messageTransfer(String destination, short confirmMode, short acquireMode);
+    public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode);
 
     /**
      * Add a set of headers the following headers to the message being sent.
@@ -301,7 +295,7 @@
      * @param mode        <ul> <li>credit ({@link Session#MESSAGE_FLOW_MODE_CREDIT}): choose credit based flow control
      *                    <li> window ({@link Session#MESSAGE_FLOW_MODE_WINDOW}): choose window based flow control</ul>
      */
-    public void messageFlowMode(String destination, short mode);
+    public void messageSetFlowMode(String destination, MessageFlowMode mode);
 
 
     /**
@@ -322,7 +316,7 @@
      *                    </ul>
      * @param value       Number of credits, a value of 0 indicates an infinite amount of credit.
      */
-    public void messageFlow(String destination, short unit, long value);
+    public void messageFlow(String destination, MessageCreditUnit unit, long value);
 
     /**
      * Forces the broker to exhaust its credit supply.
@@ -364,24 +358,7 @@
      *               failed).
      * @param text   String describing the reason for a message transfer rejection.
      */
-    public void messageReject(RangeSet ranges, int code, String text);
-
-    /**
-     * This method asks the broker to redeliver all unacknowledged messages on a specified session.
-     * Zero or more messages may be redelivered. This method is only allowed on non-transacted
-     * sessions.
-     * <p> Following are valid options:
-     * <ul>
-     * <li>{@link Option#REQUEUE}: <p>IIf this field is not set, the message will be redelivered to the original recipient.
-     * If this option is ser, the server will attempt to requeue the message, potentially then delivering it
-     * to an alternative subscriber.
-     * <p/>
-     * </ul>
-     *
-     * @param _options see available options
-     */
-    public void messageRecover(Option... _options);
-
+    public void messageReject(RangeSet ranges, MessageRejectCode code, String text);
 
     /**
      * As it is possible that the broker does not manage to reject some messages, after completion of
@@ -407,31 +384,9 @@
      * The outcome of the acquisition is returned as an array of ranges of qcquired messages.
      * <p> This method should only be called on non-acquired messages.
      *
-     * @param mode   One of: <ul>
-     *               <li> any ({@link Session#MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE}): acquire any available
-     *               messages for consumption
-     *               <li> all ({@link Session#MESSAGE_ACQUIRE_MESSAGES_IF_ALL_ARE_AVAILABLE}): only acquire messages
-     *               if all are available for consumption
-     *               </ul>
      * @param ranges Ranges of messages to be acquired.
      */
-    public void messageAcquire(RangeSet ranges, short mode);
-
-    /**
-     * As it is possible that the broker does not manage to acquire some messages, after completion of
-     * {@link Session#messageAcquire} this method will return the ranges of acquired messages.
-     * <p> Note that {@link Session#messageAcquire} and this methods are asynchronous therefore for accessing to the
-     * previously acquired messages this method must be invoked in conjunction with {@link Session#sync()}.
-     * <p> A recommended invocation sequence would be:
-     * <ul>
-     * <li> {@link Session#messageAcquire}
-     * <li> {@link Session#sync()}
-     * <li> {@link Session#getAccquiredMessages()}
-     * </ul>
-     *
-     * @return returns the message ranges marked by the broker as acquired.
-     */
-    public RangeSet getAccquiredMessages();
+    public Future<Acquired> messageAcquire(RangeSet ranges);
 
     /**
      * Give up responsibility for processing ranges of messages.
@@ -439,7 +394,7 @@
      *
      * @param ranges Ranges of messages to be released.
      */
-    public void messageRelease(RangeSet ranges);
+    public void messageRelease(RangeSet ranges, Option ... options);
 
     // -----------------------------------------------
     //            Local transaction methods
@@ -519,7 +474,7 @@
      *                     routing keys depends on the exchange implementation.
      * @param arguments    Used for backward compatibility
      */
-    public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments);
+    public void exchangeBind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments);
 
     /**
      * Unbind a queue from an exchange.
@@ -527,9 +482,8 @@
      * @param queueName    Specifies the name of the queue to unbind.
      * @param exchangeName The name of the exchange to unbind from.
      * @param routingKey   Specifies the routing key of the binding to unbind.
-     * @param arguments    Used for backward compatibility
      */
-    public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments);
+    public void exchangeUnbind(String queueName, String exchangeName, String routingKey);
 
     /**
      * This method removes all messages from a queue. It does not cancel consumers. Purged messages
@@ -582,8 +536,8 @@
      * @param arguments  bacward compatibilties params.
      * @return Information on the specified binding.
      */
-    public Future<BindingQueryResult> bindingQuery(String exchange, String queue, String routingKey,
-                                                   Map<String, Object> arguments);
+    public Future<ExchangeBoundResult> exchangeBound(String exchange, String queue, String routingKey,
+                                                     Map<String, Object> arguments);
 
     // --------------------------------------
     //              exhcange methods

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java Wed Apr 16 06:32:13 2008
@@ -10,6 +10,8 @@
 import org.apache.qpidity.api.Message;
 import org.apache.qpidity.nclient.ClosedListener;
 import org.apache.qpidity.nclient.MessagePartListener;
+import org.apache.qpidity.transport.MessageAcceptMode;
+import org.apache.qpidity.transport.MessageAcquireMode;
 import org.apache.qpidity.transport.Option;
 import org.apache.qpidity.transport.Range;
 import org.apache.qpidity.transport.RangeSet;
@@ -45,13 +47,17 @@
 
     private static  long MAX_NOT_SYNC_DATA_LENGH;
     private static  long MAX_NOT_FLUSH_DATA_LENGH;
+
     private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>();
     private ClosedListener _exceptionListner;
-    private RangeSet _acquiredMessages;
     private RangeSet _rejectedMessages;
     private long _currentDataSizeNotSynced;
     private long _currentDataSizeNotFlushed;
 
+    public ClientSession(byte[] name)
+    {
+        super(name);
+    }
 
     public void messageAcknowledge(RangeSet ranges)
     {
@@ -60,20 +66,24 @@
             super.processed(range);
         }
         super.flushProcessed();
+        messageAccept(ranges);
     }
 
-    public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map<String, Object> filter, Option... options)
+    public void messageSubscribe(String queue, String destination, short acceptMode, short acquireMode, MessagePartListener listener, Map<String, Object> filter, Option... options)
     {
         setMessageListener(destination,listener);
-        super.messageSubscribe(queue, destination, confirmMode, acquireMode, filter, options);
+        super.messageSubscribe(queue, destination, MessageAcceptMode.get(acceptMode),
+                               MessageAcquireMode.get(acquireMode), null, 0, filter,
+                               options);
     }
 
-    public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode) throws IOException
+    public void messageTransfer(String destination, Message msg, short acceptMode, short acquireMode) throws IOException
     {
         // The javadoc clearly says that this method is suitable for small messages
         // therefore reading the content in one shot.
         ByteBuffer  data = msg.readData();
-        super.messageTransfer(destination, confirmMode, acquireMode);
+        super.messageTransfer(destination, MessageAcceptMode.get(acceptMode),
+                              MessageAcquireMode.get(acquireMode));
        // super.header(msg.getDeliveryProperties(),msg.getMessageProperties()  );
         if( msg.getHeader() == null || msg.getDeliveryProperties().isDirty() || msg.getMessageProperties().isDirty() )
         {
@@ -118,9 +128,10 @@
         super.data(bytes);
     }
 
-    public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException
+    public void messageStream(String destination, Message msg, short acceptMode, short acquireMode) throws IOException
     {
-        super.messageTransfer(destination, confirmMode, acquireMode);
+        super.messageTransfer(destination, MessageAcceptMode.get(acceptMode),
+                              MessageAcquireMode.get(acquireMode));
         super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
         boolean b = true;
         int count = 0;
@@ -153,11 +164,6 @@
         }*/
     }
 
-    public RangeSet getAccquiredMessages()
-    {
-        return _acquiredMessages;
-    }
-
     public RangeSet getRejectedMessages()
     {
         return _rejectedMessages;
@@ -175,11 +181,6 @@
     public void setClosedListener(ClosedListener exceptionListner)
     {
         _exceptionListner = exceptionListner;
-    }
-
-    void setAccquiredMessages(RangeSet acquiredMessages)
-    {
-        _acquiredMessages = acquiredMessages;
     }
 
     void setRejectedMessages(RangeSet rejectedMessages)

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java Wed Apr 16 06:32:13 2008
@@ -9,12 +9,11 @@
 import org.apache.qpidity.QpidException;
 import org.apache.qpidity.transport.Data;
 import org.apache.qpidity.transport.Header;
-import org.apache.qpidity.transport.MessageAcquired;
 import org.apache.qpidity.transport.MessageReject;
 import org.apache.qpidity.transport.MessageTransfer;
 import org.apache.qpidity.transport.Range;
 import org.apache.qpidity.transport.Session;
-import org.apache.qpidity.transport.SessionClosed;
+import org.apache.qpidity.transport.SessionDetached;
 import org.apache.qpidity.transport.SessionDelegate;
 
 
@@ -23,9 +22,9 @@
     private MessageTransfer _currentTransfer;
     private MessagePartListener _currentMessageListener;
     
-    @Override public void sessionClosed(Session ssn,SessionClosed sessionClosed)
+    @Override public void sessionDetached(Session ssn, SessionDetached dtc)
     {
-        ((ClientSession)ssn).notifyException(new QpidException(sessionClosed.getReplyText(),ErrorCode.get(sessionClosed.getReplyCode()),null));
+        ((ClientSession)ssn).notifyException(new QpidException("", ErrorCode.get(dtc.getCode().getValue()),null));
     }
     
     //  --------------------------------------------
@@ -76,10 +75,5 @@
         ((ClientSession)session).notifyException(new QpidException("Message Rejected",ErrorCode.MESSAGE_REJECTED,null));
         session.processed(struct);
     }
-    
-    @Override public void messageAcquired(Session session, MessageAcquired struct) 
-    {
-        ((ClientSession)session).setAccquiredMessages(struct.getTransfers());
-        session.processed(struct);
-    }
+
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java Wed Apr 16 06:32:13 2008
@@ -9,8 +9,12 @@
 import org.apache.qpidity.nclient.util.MessageListener;
 import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
 import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageAcceptMode;
+import org.apache.qpidity.transport.MessageAcquireMode;
 import org.apache.qpidity.transport.MessageProperties;
 
+import java.util.UUID;
+
 public class DemoClient
 {
     public static MessagePartListenerAdapter createAdapter()
@@ -46,19 +50,20 @@
                      }
                 });
         ssn.queueDeclare("queue1", null, null);
-        ssn.queueBind("queue1", "amq.direct", "queue1",null);
+        ssn.exchangeBind("queue1", "amq.direct", "queue1",null);
         ssn.sync();
 
         ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
 
         // queue
-        ssn.messageTransfer("amq.direct", (short) 0, (short) 1);
-        ssn.header(new DeliveryProperties().setRoutingKey("queue1"),new MessageProperties().setMessageId("123"));
+        ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
+        ssn.header(new DeliveryProperties().setRoutingKey("queue1"),
+                   new MessageProperties().setMessageId(UUID.randomUUID()));
         ssn.data("this is the data");
         ssn.endData();
 
         //reject
-        ssn.messageTransfer("amq.direct", (short) 0, (short) 1);
+        ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
         ssn.data("this should be rejected");
         ssn.header(new DeliveryProperties().setRoutingKey("stocks"));
         ssn.endData();
@@ -71,17 +76,18 @@
         ssn.sync();
 
         ssn.queueDeclare("topic1", null, null);
-        ssn.queueBind("topic1", "amq.topic", "stock.*",null);
+        ssn.exchangeBind("topic1", "amq.topic", "stock.*",null);
         ssn.queueDeclare("topic2", null, null);
-        ssn.queueBind("topic2", "amq.topic", "stock.us.*",null);
+        ssn.exchangeBind("topic2", "amq.topic", "stock.us.*",null);
         ssn.queueDeclare("topic3", null, null);
-        ssn.queueBind("topic3", "amq.topic", "stock.us.rh",null);
+        ssn.exchangeBind("topic3", "amq.topic", "stock.us.rh",null);
         ssn.sync();
 
         // topic
-        ssn.messageTransfer("amq.topic", (short) 0, (short) 1);
+        ssn.messageTransfer("amq.topic", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
         ssn.data("Topic message");
-        ssn.header(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new MessageProperties().setMessageId("456"));
+        ssn.header(new DeliveryProperties().setRoutingKey("stock.us.ibm"),
+                   new MessageProperties().setMessageId(UUID.randomUUID()));
         ssn.endData();
     }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java Wed Apr 16 06:32:13 2008
@@ -14,6 +14,8 @@
 import org.apache.qpidity.transport.DeliveryProperties;
 import org.apache.qpidity.transport.MessageProperties;
 
+import java.util.UUID;
+
 public class LargeMsgDemoClient
 {
     public static MessagePartListenerAdapter createAdapter()
@@ -49,7 +51,7 @@
                      }
                 });
         ssn.queueDeclare("queue1", null, null);
-        ssn.queueBind("queue1", "amq.direct", "queue1",null);
+        ssn.exchangeBind("queue1", "amq.direct", "queue1",null);
         ssn.sync();
 
         ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
@@ -59,7 +61,7 @@
            FileMessage msg = new FileMessage(new FileInputStream("/home/rajith/TestFile"),
                                              1024,
                                              new DeliveryProperties().setRoutingKey("queue1"),
-                                             new MessageProperties().setMessageId("123"));
+                                             new MessageProperties().setMessageId(UUID.randomUUID()));
 
            // queue
            ssn.messageStream("amq.direct",msg, (short) 0, (short) 1);

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java Wed Apr 16 06:32:13 2008
@@ -13,6 +13,10 @@
 import org.apache.qpidity.nclient.util.MessageListener;
 import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
 import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageAcceptMode;
+import org.apache.qpidity.transport.MessageAcquireMode;
+import org.apache.qpidity.transport.MessageCreditUnit;
+import org.apache.qpidity.transport.MessageFlowMode;
 import org.apache.qpidity.transport.MessageProperties;
 import org.apache.qpidity.transport.RangeSet;
 
@@ -66,14 +70,14 @@
         System.out.println("------- Queue created --------");
 
         System.out.println("------- Binding a queue --------");
-        session.queueBind("testQueue", "test", "testKey", null);
+        session.exchangeBind("testQueue", "test", "testKey", null);
         session.sync();
         System.out.println("------- Queue bound --------");
     }
 
     public void testSendMessage(){
         System.out.println("------- Sending a message --------");
-        session.messageTransfer("test", Session.TRANSFER_CONFIRM_MODE_REQUIRED, Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+        session.messageTransfer("test", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
 
         Map<String,Object> props = new HashMap<String,Object>();
         props.put("name", "rajith");
@@ -111,11 +115,10 @@
                                  null);
 
         System.out.println("------- Setting Credit mode --------");
-        session.messageFlowMode("myDest", Session.MESSAGE_FLOW_MODE_WINDOW);
+        session.messageSetFlowMode("myDest", MessageFlowMode.WINDOW);
         System.out.println("------- Setting Credit --------");
-        session.messageFlow("myDest", Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
-        //session.messageFlow("myDest", Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
-        session.messageFlow("myDest", Session.MESSAGE_FLOW_UNIT_BYTE, -1);
+        session.messageFlow("myDest", MessageCreditUnit.MESSAGE, 1);
+        session.messageFlow("myDest", MessageCreditUnit.BYTE, -1);
     }
 
     public void testMessageFlush()

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java Wed Apr 16 06:32:13 2008
@@ -287,7 +287,7 @@
 
         con.start();
 
-        long waitTime = 300000L;
+        long waitTime = 30000L;
         long waitUntilTime = System.currentTimeMillis() + waitTime;
 
         synchronized (lock)

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java Wed Apr 16 06:32:13 2008
@@ -50,6 +50,9 @@
 
     public void testAllMethodsThrowAfterConnectionClose() throws Exception
     {
+        // give external brokers a chance to start up
+        Thread.sleep(3000);
+
         AMQConnection connection =   (AMQConnection) getConnection("guest", "guest");
 
         Topic destination1 = new AMQTopic(connection, "t1");

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Wed Apr 16 06:32:13 2008
@@ -276,7 +276,7 @@
         _session.commit();
         assertNotNull("test message was consumed and rolled back, but is gone", result);
         assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText());
-        assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered());
+        assertTrue("Message is not marked as redelivered", result.getJMSRedelivered());
     }
 
     /**
@@ -318,7 +318,7 @@
         _session.commit();
         assertNotNull("test message was consumed and rolled back, but is gone", result);
         assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText());
-        assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered());
+        assertTrue("Message is not marked as redelivered", result.getJMSRedelivered());
     }
 
     /**
@@ -430,7 +430,7 @@
         _pubSession.commit();
 
         _logger.info("getting test message");
-        Message result = _consumer.receive(1000);
+        Message result = _consumer.receive(5000);
 
         assertNotNull("Message received should not be null", result);
         assertEquals("1", ((TextMessage) result).getText());
@@ -444,39 +444,24 @@
 
         _logger.info("receiving result");
 
-// NOTE: Both msg 1 & 2 will be marked as redelivered as they have both will have been rejected.
-// Only the occasion where it is not rejected will it mean it hasn't arrived at the client yet.
-        result = _consumer.receive(1000);
-        assertNotNull("test message was consumed and rolled back, but is gone", result);
 
-// The first message back will be either 1 or 2 being redelivered
-        if (result.getJMSRedelivered())
-        {
-            assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
-        }
-        else // or it will be msg 2 arriving the first time due to latency.
-        {
-            _logger.info("Message 2 wasn't prefetched so wasn't rejected");
-            assertEquals("2", ((TextMessage) result).getText());
-        }
+        // Message 2 may be marked as redelivered if it was prefetched.
+        result = _consumer.receive(5000);
+        assertNotNull("Second message was not consumed, but is gone", result);
 
-        result = _consumer.receive(1000);
+        // The first message back will be 2, message 1 has been received but not committed
+        // Closing the consumer does not commit the session.
 
-        if (isBroker08())
+        // if this is message 1 then it should be marked as redelivered
+        if("1".equals(((TextMessage) result).getText()))
         {
-            assertNotNull("test message was consumed and rolled back, but is gone", result);
-           // assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
-        }
-        else
-        {
-            assertNull("test message was consumed and not rolled back, but is redelivered", result);
+            fail("First message was recieved again");
         }
 
         result = _consumer.receive(1000);
         assertNull("test message should be null:" + result, result);
 
         _session.commit();
-
     }
 
     public void testPutThenRollbackThenGet() throws Exception

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java Wed Apr 16 06:32:13 2008
@@ -219,14 +219,7 @@
             // ensure sent messages are not visible and received messages are requeued
             expect("RB_A", consumer1.receive(1000), true);
             expect("RB_B", consumer1.receive(1000), true);
-            if( isBroker08() )
-            {
-                expect("RB_C", consumer1.receive(1000), true);
-            }
-            else
-            {
-                  expect("RB_C", consumer1.receive(1000), false);
-            }
+            expect("RB_C", consumer1.receive(1000), true);
             _logger.info("Starting new connection");
             testCon.start();
             testConsumer1 = testSession.createConsumer(queue1);

Added: incubator/qpid/trunk/qpid/java/common/Composite.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/Composite.tpl?rev=648692&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/Composite.tpl (added)
+++ incubator/qpid/trunk/qpid/java/common/Composite.tpl Wed Apr 16 06:32:13 2008
@@ -0,0 +1,173 @@
+package org.apache.qpidity.transport;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpidity.transport.codec.Decoder;
+import org.apache.qpidity.transport.codec.Encodable;
+import org.apache.qpidity.transport.codec.Encoder;
+
+import org.apache.qpidity.transport.network.Frame;
+
+${
+from genutil import *
+
+cls = klass(type)["@name"]
+
+if type.name in ("control", "command"):
+  base = "Method"
+  size = 0
+  pack = 2
+  if type["segments"]:
+    payload = "true"
+  else:
+    payload = "false"
+  if type.name == "control" and cls == "connection":
+    track = "Frame.L1"
+  elif cls == "session" and type["@name"] in ("attach", "attached", "detach", "detached"):
+    track = "Frame.L2"
+  elif type.name == "command":
+    track = "Frame.L4"
+  else:
+    track = "Frame.L3"
+else:
+  base = "Struct"
+  size = type["@size"]
+  pack = type["@pack"]
+  payload = "false"
+  track = "-1"
+
+typecode = code(type)
+}
+
+public class $name extends $base {
+
+    public static final int TYPE = $typecode;
+
+    public final int getStructType() {
+        return TYPE;
+    }
+
+    public final int getSizeWidth() {
+        return $size;
+    }
+
+    public final int getPackWidth() {
+        return $pack;
+    }
+
+    public final boolean hasPayload() {
+        return $payload;
+    }
+
+    public final byte getEncodedTrack() {
+        return $track;
+    }
+
+    private static final List<Field<?,?>> FIELDS = new ArrayList<Field<?,?>>();
+    public List<Field<?,?>> getFields() { return FIELDS; }
+
+${
+fields = get_fields(type)
+params = get_parameters(fields)
+options = get_options(fields)
+
+for f in fields:
+  out("    private boolean has_$(f.name);\n")
+  out("    private $(f.type) $(f.name);\n")
+}
+
+${
+if fields:
+  out("    public $name() {}\n")
+}
+
+    public $name($(", ".join(params))) {
+${
+for f in fields:
+  if f.option: continue
+  out("        $(f.set)($(f.name));\n")
+
+for f in options:
+  out("        boolean _$(f.name) = false;\n")
+
+if options:
+  out("""
+        for (int i=0; i < _options.length; i++) {
+            switch (_options[i]) {
+""")
+
+  for f in options:
+    out("            case $(f.option): _$(f.name) = true; break;\n")
+
+  out("""            case NO_OPTION: break;
+            default: throw new IllegalArgumentException("invalid option: " + _options[i]);
+            }
+        }
+""")
+
+for f in options:
+  out("        $(f.set)(_$(f.name));\n")
+}
+    }
+
+    public <C> void dispatch(C context, MethodDelegate<C> delegate) {
+        delegate.$(dromedary(name))(context, this);
+    }
+
+${
+for f in fields:
+  out("""
+    public final boolean $(f.has)() {
+        return has_$(f.name);
+    }
+
+    public final $name $(f.clear)() {
+        this.has_$(f.name) = false;
+        this.$(f.name) = $(f.default);
+        this.dirty = true;
+        return this;
+    }
+
+    public final $(f.type) $(f.get)() {
+        return $(f.name);
+    }
+
+    public final $name $(f.set)($(f.type) value) {
+        this.$(f.name) = value;
+        this.has_$(f.name) = true;
+        this.dirty = true;
+        return this;
+    }
+
+    public final $name $(f.name)($(f.type) value) {
+        this.$(f.name) = value;
+        this.has_$(f.name) = true;
+        this.dirty = true;
+        return this;
+    }
+
+    static {
+        FIELDS.add(new Field<$name,$(jref(jclass(f.type)))>($name.class, $(jref(jclass(f.type))).class, "$(f.name)", $(f.index)) {
+            public boolean has(Object struct) {
+                return check(struct).has_$(f.name);
+            }
+            public void has(Object struct, boolean value) {
+                check(struct).has_$(f.name) = value;
+            }
+            public $(jref(f.type)) get(Object struct) {
+                return check(struct).$(f.get)();
+            }
+            public void read(Decoder dec, Object struct) {
+                check(struct).$(f.name) = $(f.read);
+                check(struct).dirty = true;
+            }
+            public void write(Encoder enc, Object struct) {
+               $(f.write);
+            }
+        });
+    }
+""")
+}}

Added: incubator/qpid/trunk/qpid/java/common/Constant.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/Constant.tpl?rev=648692&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/Constant.tpl (added)
+++ incubator/qpid/trunk/qpid/java/common/Constant.tpl Wed Apr 16 06:32:13 2008
@@ -0,0 +1,14 @@
+package org.apache.qpidity.transport;
+
+${from genutil import *}
+
+public interface Constant
+{
+${
+constants = spec.query["amqp/constant"]
+
+for c in constants:
+  name = scream(c["@name"])
+  value = c["@value"]
+  out("    public static final int $name = $value;\n")
+}}

Added: incubator/qpid/trunk/qpid/java/common/Enum.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/Enum.tpl?rev=648692&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/Enum.tpl (added)
+++ incubator/qpid/trunk/qpid/java/common/Enum.tpl Wed Apr 16 06:32:13 2008
@@ -0,0 +1,36 @@
+package org.apache.qpidity.transport;
+
+public enum $name {
+${
+from genutil import *
+
+vtype = jtype(resolve_type(type))
+
+choices = [(scream(ch["@name"]), "(%s) %s" % (vtype, ch["@value"]))
+           for ch in type.query["enum/choice"]]
+}
+    $(",\n    ".join(["%s(%s)" % ch for ch in choices]));
+
+    private final $vtype value;
+
+    $name($vtype value)
+    {
+        this.value = value;
+    }
+
+    public $vtype getValue()
+    {
+        return value;
+    }
+
+    public static $name get($vtype value)
+    {
+        switch (value)
+        {
+${
+for ch, value in choices:
+  out('        case $value: return $ch;\n')
+}        default: throw new IllegalArgumentException("no such value: " + value);
+        }
+    }
+}

Added: incubator/qpid/trunk/qpid/java/common/Invoker.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/Invoker.tpl?rev=648692&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/Invoker.tpl (added)
+++ incubator/qpid/trunk/qpid/java/common/Invoker.tpl Wed Apr 16 06:32:13 2008
@@ -0,0 +1,41 @@
+package org.apache.qpidity.transport;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public abstract class Invoker {
+
+    protected abstract void invoke(Method method);
+    protected abstract <T> Future<T> invoke(Method method, Class<T> resultClass);
+
+${
+from genutil import *
+
+for c in composites:
+  name = cname(c)
+  fields = get_fields(c)
+  params = get_parameters(fields)
+  args = get_arguments(fields)
+  result = c["result"]
+  if result:
+    if not result["@type"]:
+      rname = cname(result["struct"])
+    else:
+      rname = cname(result, "@type")
+    jresult = "Future<%s>" % rname
+    jreturn = "return "
+    jclass = ", %s.class" % rname
+  else:
+    jresult = "void"
+    jreturn = ""
+    jclass = ""
+
+  out("""
+     public $jresult $(dromedary(name))($(", ".join(params))) {
+         $(jreturn)invoke(new $name($(", ".join(args)))$jclass);
+     }
+""")
+}
+
+}

Added: incubator/qpid/trunk/qpid/java/common/MethodDelegate.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/MethodDelegate.tpl?rev=648692&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/MethodDelegate.tpl (added)
+++ incubator/qpid/trunk/qpid/java/common/MethodDelegate.tpl Wed Apr 16 06:32:13 2008
@@ -0,0 +1,12 @@
+package org.apache.qpidity.transport;
+
+public abstract class MethodDelegate<C> {
+
+${
+from genutil import *
+
+for c in composites:
+  name = cname(c)
+  out("    public void $(dromedary(name))(C context, $name struct) {}\n")
+}
+}

Added: incubator/qpid/trunk/qpid/java/common/Option.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/Option.tpl?rev=648692&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/Option.tpl (added)
+++ incubator/qpid/trunk/qpid/java/common/Option.tpl Wed Apr 16 06:32:13 2008
@@ -0,0 +1,19 @@
+package org.apache.qpidity.transport;
+
+public enum Option {
+
+${
+from genutil import *
+
+options = {}
+
+for c in composites:
+  for f in c.query["field"]:
+    t = resolve_type(f)
+    if t["@name"] == "bit":
+      option = scream(f["@name"])
+      if not options.has_key(option):
+        options[option] = None
+        out("    $option,\n")}
+    NO_OPTION
+}

Added: incubator/qpid/trunk/qpid/java/common/StructFactory.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/StructFactory.tpl?rev=648692&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/StructFactory.tpl (added)
+++ incubator/qpid/trunk/qpid/java/common/StructFactory.tpl Wed Apr 16 06:32:13 2008
@@ -0,0 +1,39 @@
+package org.apache.qpidity.transport;
+
+class StructFactory {
+
+    public static Struct create(int type)
+    {
+        switch (type)
+        {
+${
+from genutil import *
+
+fragment = """        case $name.TYPE:
+            return new $name();
+"""
+
+for c in composites:
+  name = cname(c)
+  if c.name == "struct":
+    out(fragment)
+}        default:
+            throw new IllegalArgumentException("type: " + type);
+        }
+    }
+
+    public static Struct createInstruction(int type)
+    {
+        switch (type)
+        {
+${
+for c in composites:
+  name = cname(c)
+  if c.name in ("command", "control"):
+    out(fragment)
+}        default:
+            throw new IllegalArgumentException("type: " + type);
+        }
+    }
+
+}

Added: incubator/qpid/trunk/qpid/java/common/Type.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/Type.tpl?rev=648692&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/Type.tpl (added)
+++ incubator/qpid/trunk/qpid/java/common/Type.tpl Wed Apr 16 06:32:13 2008
@@ -0,0 +1,63 @@
+package org.apache.qpidity.transport;
+
+${from genutil import *}
+
+public enum Type
+{
+
+${
+types = spec.query["amqp/type"] + spec.query["amqp/class/type"]
+codes = {}
+first = True
+for t in types:
+  code = t["@code"]
+  fix_width = t["@fixed-width"]
+  var_width = t["@variable-width"]
+
+  if code is None:
+    continue
+
+  if fix_width is None:
+    width = var_width
+    fixed = "false"
+  else:
+    width = fix_width
+    fixed = "true"
+
+  name = scream(t["@name"])
+  codes[code] = name
+
+  if first:
+    first = False
+  else:
+    out(",\n")
+
+  out("    $name((byte) $code, $width, $fixed)")
+};
+
+    public byte code;
+    public int width;
+    public boolean fixed;
+
+    Type(byte code, int width, boolean fixed)
+    {
+        this.code = code;
+        this.width = width;
+        this.fixed = fixed;
+    }
+
+    public static Type get(byte code)
+    {
+        switch (code)
+        {
+${
+keys = list(codes.keys())
+keys.sort()
+
+for code in keys:
+  out("        case (byte) $code: return $(codes[code]);\n")
+}
+        default: return null;
+        }
+    }
+}

Modified: incubator/qpid/trunk/qpid/java/common/build.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/build.xml?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/build.xml (original)
+++ incubator/qpid/trunk/qpid/java/common/build.xml Wed Apr 16 06:32:13 2008
@@ -34,7 +34,7 @@
 
   <target name="check_jython_deps">
     <uptodate property="jython.notRequired" targetfile="${jython.timestamp}">
-      <srcfiles dir="${xml.spec.dir}" includes="amqp.0-10-preview.xml" />
+      <srcfiles dir="${xml.spec.dir}" includes="amqp.0-10-qpid-errata.xml" />
     </uptodate>
   </target>
 
@@ -42,10 +42,9 @@
     <java classname="org.python.util.jython" fork="true" failonerror="true">
       <arg value="-Dpython.cachedir.skip=true"/>
       <arg value="-Dpython.path=${basedir}/jython-lib.jar/Lib${path.separator}${mllib.dir}"/>
-      <arg value="${basedir}/generate"/>
+      <arg value="${basedir}/codegen"/>
       <arg value="${module.precompiled}"/>
-      <arg value="org.apache.qpidity.transport"/>
-      <arg value="${xml.spec.dir}/amqp.0-10-preview.xml"/>
+      <arg value="${xml.spec.dir}/amqp.0-10-qpid-errata.xml"/>
       <classpath>
         <pathelement location="jython-2.2-rc2.jar"/>
       </classpath>

Added: incubator/qpid/trunk/qpid/java/common/codegen
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/codegen?rev=648692&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/codegen (added)
+++ incubator/qpid/trunk/qpid/java/common/codegen Wed Apr 16 06:32:13 2008
@@ -0,0 +1,63 @@
+#!/usr/bin/env python
+
+import os, sys, mllib
+from templating import Parser
+from genutil import *
+
+out_dir = sys.argv[1]
+spec_file = sys.argv[2]
+pkg_dir = os.path.join(out_dir, "org/apache/qpidity/transport")
+
+if not os.path.exists(pkg_dir):
+  os.makedirs(pkg_dir)
+
+spec = mllib.xml_parse(spec_file)
+
+def excludes(nd):
+  if (nd.parent is not None and
+      nd.parent.name == "class" and
+      nd.parent["@name"] in ("file", "stream")):
+    return False
+  else:
+    return True
+
+def execute(output, template, **kwargs):
+  f = open(template)
+  input = f.read()
+  f.close()
+  p = Parser(**kwargs)
+  p.parse(input)
+  fname = os.path.join(pkg_dir, output)
+  f = open(fname, "w")
+  f.write(p.output)
+  f.close()
+
+execute("Type.java", "Type.tpl", spec = spec)
+execute("Constant.java", "Constant.tpl", spec = spec)
+
+structs = spec.query["amqp/struct"] + \
+    spec.query["amqp/class/struct", excludes] + \
+    spec.query["amqp/class/command/result/struct", excludes]
+controls = spec.query["amqp/class/control", excludes]
+commands = spec.query["amqp/class/command", excludes]
+
+composites = structs + controls + commands
+
+for c in composites:
+  name = cname(c)
+  execute("%s.java" % name, "Composite.tpl", type = c, name = name)
+
+execute("MethodDelegate.java", "MethodDelegate.tpl", composites = composites)
+execute("Option.java", "Option.tpl", composites = composites)
+execute("Invoker.java", "Invoker.tpl", composites = controls + commands)
+execute("StructFactory.java", "StructFactory.tpl", composites = composites)
+
+def is_enum(nd):
+  return nd["enum"] is not None
+
+enums = spec.query["amqp/domain", is_enum] + \
+    spec.query["amqp/class/domain", is_enum]
+
+for e in enums:
+  name = cname(e)
+  execute("%s.java" % name, "Enum.tpl", name = name, type = e)

Propchange: incubator/qpid/trunk/qpid/java/common/codegen
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/qpid/trunk/qpid/java/common/genutil.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/genutil.py?rev=648692&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/genutil.py (added)
+++ incubator/qpid/trunk/qpid/java/common/genutil.py Wed Apr 16 06:32:13 2008
@@ -0,0 +1,207 @@
+
+def camel(offset, *args):
+  parts = []
+  for a in args:
+    parts.extend(a.split("-"))
+  return "".join(parts[:offset] + [p[0].upper() + p[1:] for p in parts[offset:]])
+
+def dromedary(s):
+  return s[0].lower() + s[1:]
+
+def scream(*args):
+  return "_".join([a.replace("-", "_").upper() for a in args])
+
+def num(x):
+  if x is not None and x != "":
+    return int(x, 0)
+  else:
+    return None
+
+def klass(nd):
+  parent = nd.parent
+  while parent is not None:
+    if hasattr(parent, "name") and parent.name == "class":
+      return parent
+    parent = parent.parent
+
+untyped = -1
+
+def code(nd):
+  global untyped
+  cd = num(nd["@code"])
+  if cd is None:
+    cd = untyped
+    untyped -= 1
+    return cd
+
+  cls = klass(nd)
+  if cls:
+    cd |= (num(cls["@code"]) << 8)
+  return cd
+
+def root(nd):
+  if nd.parent is None:
+    return nd
+  else:
+    return root(nd.parent)
+
+def qname(nd):
+  name = nd["@name"]
+  cls = klass(nd)
+  if cls != None:
+    return "%s.%s" % (cls["@name"], name)
+  else:
+    return name
+
+def resolve(node, name):
+  spec = root(node)
+  cls = klass(node)
+  if cls:
+    for nd in cls.query["#tag"]:
+      if nd["@name"] == name:
+        return nd
+  for nd in spec.query["amqp/#tag"] + spec.query["amqp/class/#tag"]:
+    if name == qname(nd):
+      return nd
+  raise Exception("unresolved name: %s" % name)
+
+def resolve_type(nd):
+  name = nd["@type"]
+  type = resolve(nd, name)
+  if type.name == "domain" and not type["enum"]:
+    return resolve_type(type)
+  else:
+    return type
+
+TYPES = {
+  "bit": "boolean",
+  "uint8": "short",
+  "uint16": "int",
+  "uint32": "long",
+  "uint64": "long",
+  "datetime": "long",
+  "uuid": "UUID",
+  "sequence-no": "int",
+  "sequence-set": "RangeSet", # XXX
+  "byte-ranges": "RangeSet", # XXX
+  "str8": "String",
+  "str16": "String",
+  "vbin8": "byte[]",
+  "vbin16": "byte[]",
+  "vbin32": "byte[]",
+  "struct32": "Struct",
+  "map": "Map<String,Object>",
+  "array": "List<Object>"
+  }
+
+def cname(nd, field="@name"):
+  cls = klass(nd)
+  if cls:
+    if (nd.name in ("struct", "result") and
+        cls["@name"] != "session" and
+        nd[field] != "header"):
+      return camel(0, nd[field])
+    else:
+      return camel(0, cls["@name"], nd[field])
+  else:
+    return camel(0, nd[field])
+
+def jtype(nd):
+  if nd.name == "struct" or nd["enum"]:
+    return cname(nd)
+  else:
+    return TYPES[nd["@name"]]
+
+REFS = {
+  "boolean": "Boolean",
+  "byte": "Byte",
+  "short": "Short",
+  "int": "Integer",
+  "long": "Long",
+  "float": "Float",
+  "double": "Double",
+  "char": "Character"
+}
+
+def jref(jt):
+  return REFS.get(jt, jt)
+
+def jclass(jt):
+  idx = jt.find('<')
+  if idx > 0:
+    return jt[:idx]
+  else:
+    return jt
+
+DEFAULTS = {
+  "long": 0,
+  "int": 0,
+  "short": 0,
+  "byte": 0,
+  "char": 0,
+  "boolean": "false"
+  }
+
+class Field:
+
+  def __init__(self, index, nd):
+    self.index = index
+    self.name = camel(1, nd["@name"])
+    type_node = resolve_type(nd)
+    tname = cname(type_node)
+    if type_node.name == "struct":
+      self.read = "(%s) dec.readStruct(%s.TYPE)" % (tname, tname)
+      self.write = "enc.writeStruct(%s.TYPE, check(struct).%s)" % (tname, self.name)
+    elif type_node.name == "domain":
+      coder = camel(0, resolve_type(type_node)["@name"])
+      self.read = "%s.get(dec.read%s())" % (tname, coder)
+      self.write = "enc.write%s(check(struct).%s.getValue())" % (coder, self.name)
+    else:
+      coder = camel(0, type_node["@name"])
+      self.read = "dec.read%s()" % coder
+      self.write = "enc.write%s(check(struct).%s)" % (coder, self.name)
+    self.type = jtype(type_node)
+    self.default = DEFAULTS.get(self.type, "null")
+    self.has = camel(1, "has", self.name)
+    self.get = camel(1, "get", self.name)
+    self.set = camel(1, "set", self.name)
+    self.clear = camel(1, "clear", self.name)
+    if self.type == "boolean":
+      self.option = scream(nd["@name"])
+    else:
+      self.option = None
+
+def get_fields(nd):
+  fields = []
+  index = 0
+  for f in nd.query["field"]:
+    fields.append(Field(index, f))
+    index += 1
+  return fields
+
+def get_parameters(fields):
+  params = []
+  options = False
+  for f in fields:
+    if f.option:
+      options = True
+    else:
+      params.append("%s %s" % (f.type, f.name))
+  if options:
+    params.append("Option ... _options")
+  return params
+
+def get_arguments(fields):
+  args = []
+  options = False
+  for f in fields:
+    if f.option:
+      options = True
+    else:
+      args.append(f.name)
+  if options:
+    args.append("_options")
+  return args
+
+def get_options(fields):
+  return [f for f in fields if f.option]

Propchange: incubator/qpid/trunk/qpid/java/common/genutil.py
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java Wed Apr 16 06:32:13 2008
@@ -22,6 +22,7 @@
 
 import java.io.UnsupportedEncodingException;
 import java.util.HashSet;
+import java.util.List;
 import java.util.StringTokenizer;
 
 import org.apache.qpidity.security.AMQPCallbackHandler;
@@ -29,13 +30,12 @@
 
 public class SecurityHelper
 {
-    public static String chooseMechanism(String mechanisms) throws UnsupportedEncodingException
+    public static String chooseMechanism(List<Object> mechanisms) throws UnsupportedEncodingException
     {
-        StringTokenizer tokenizer = new StringTokenizer(mechanisms, " ");
         HashSet mechanismSet = new HashSet();
-        while (tokenizer.hasMoreTokens())
+        for (Object m : mechanisms)
         {
-            mechanismSet.add(tokenizer.nextToken());
+            mechanismSet.add(m);
         }
 
         String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms();

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java Wed Apr 16 06:32:13 2008
@@ -59,7 +59,7 @@
     public void messageAcquire(Session context, MessageAcquire struct)
     {
         System.out.println("\n==================> messageAcquire " );
-        context.messageAcquired(struct.getTransfers());
+        context.executionResult((int) struct.getId(), new Acquired(struct.getTransfers()));
     }
 
     @Override public void queueDeclare(Session ssn, QueueDeclare qd)
@@ -68,16 +68,16 @@
         System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n");
     }
 
-    @Override public void queueBind(Session ssn, QueueBind qb)
+    @Override public void exchangeBind(Session ssn, ExchangeBind qb)
     {
-        exchange.bindQueue(qb.getExchange(), qb.getRoutingKey(),qb.getQueue());
-        System.out.println("\n==================> bound queue: " + qb.getQueue() + " with routing key " + qb.getRoutingKey() + "\n");
+        exchange.bindQueue(qb.getExchange(), qb.getBindingKey(),qb.getQueue());
+        System.out.println("\n==================> bound queue: " + qb.getQueue() + " with binding key " + qb.getBindingKey() + "\n");
     }
 
     @Override public void queueQuery(Session ssn, QueueQuery qq)
     {
         QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue());
-        ssn.executionResult(qq.getId(), result);
+        ssn.executionResult((int) qq.getId(), result);
     }
 
     @Override public void messageSubscribe(Session ssn, MessageSubscribe ms)
@@ -112,7 +112,8 @@
     {
         if (xfr == null || body == null)
         {
-            ssn.connectionClose(503, "no method segment", 0, 0);
+            ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR,
+                                "no method segment");
             ssn.close();
             return;
         }
@@ -136,7 +137,7 @@
     {
         if (xfr == null || body == null)
         {
-            ssn.connectionClose(503, "no method segment", 0, 0);
+            ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR, "no method segment");
             ssn.close();
             return;
         }
@@ -174,14 +175,16 @@
         {
             RangeSet ranges = new RangeSet();
             ranges.add(xfr.getId());
-            ssn.messageReject(ranges, 0, "no such destination");
+            ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE,
+                              "no such destination");
         }
     }
 
     private void transferMessageToPeer(Session ssn,String dest, Message m)
     {
         System.out.println("\n==================> Transfering message to: " +dest + "\n");
-        ssn.messageTransfer(dest, (short)0, (short)0);
+        ssn.messageTransfer(dest, MessageAcceptMode.EXPLICIT,
+                            MessageAcquireMode.PRE_ACQUIRED);
         ssn.header(m.header);
         for (Data d : m.body)
         {

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java Wed Apr 16 06:32:13 2008
@@ -63,7 +63,7 @@
     public static final void main(String[] args)
     {
         Connection conn = MinaHandler.connect("0.0.0.0", 5672,
-                                              new ConnectionDelegate()
+                                              new ClientDelegate()
                                               {
                                                   public SessionDelegate getSessionDelegate()
                                                   {
@@ -80,9 +80,9 @@
                 TransportConstants.getVersionMinor())));
 
         Channel ch = conn.getChannel(0);
-        Session ssn = new Session();
+        Session ssn = new Session("my-session".getBytes());
         ssn.attach(ch);
-        ssn.sessionOpen(1234);
+        ssn.sessionAttach(ssn.getName());
 
         ssn.queueDeclare("asdf", null, null);
         ssn.sync();
@@ -111,13 +111,15 @@
         map.put("list", Arrays.asList(1, 2, 3));
         map.put("binary", new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
 
-        ssn.messageTransfer("asdf", (short) 0, (short) 1);
+        ssn.messageTransfer("asdf", MessageAcceptMode.EXPLICIT,
+                            MessageAcquireMode.PRE_ACQUIRED);
         ssn.header(new DeliveryProperties(),
                    new MessageProperties().setApplicationHeaders(map));
         ssn.data("this is the data");
         ssn.endData();
 
-        ssn.messageTransfer("fdsa", (short) 0, (short) 1);
+        ssn.messageTransfer("fdsa", MessageAcceptMode.EXPLICIT,
+                            MessageAcquireMode.PRE_ACQUIRED);
         ssn.data("this should be rejected");
         ssn.endData();
         ssn.sync();

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java Wed Apr 16 06:32:13 2008
@@ -241,28 +241,10 @@
      * @return The String representation of this Xid
      * @throws QpidException In case of problem when converting this Xid into a string.
      */
-    public static String convertToString(Xid xid) throws QpidException
+    public static org.apache.qpidity.transport.Xid convert(Xid xid) throws QpidException
     {
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("converting " + xid + " into a String");
-        }
-        try
-        {
-            ByteArrayOutputStream res = new ByteArrayOutputStream();
-            DataOutputStream out = new DataOutputStream(res);
-            out.writeLong(xid.getFormatId());
-            byte[] txId = xid.getGlobalTransactionId();
-            byte[] brId = xid.getBranchQualifier();
-            out.writeByte(txId.length);
-            out.writeByte(brId.length);
-            out.write(txId);
-            out.write(brId);
-            return res.toString();
-        }
-        catch (IOException e)
-        {
-            throw new QpidException("cannot convert the xid " + xid + " into a String", null, e);
-        }
+        return new org.apache.qpidity.transport.Xid(xid.getFormatId(),
+                                                    xid.getGlobalTransactionId(),
+                                                    xid.getBranchQualifier());
     }
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java Wed Apr 16 06:32:13 2008
@@ -77,7 +77,7 @@
         connection.getConnectionDelegate().init(this, hdr);
     }
 
-    public void method(Void v, Method method)
+    public void control(Void v, Method method)
     {
         switch (method.getEncodedTrack())
         {
@@ -90,13 +90,15 @@
         case L3:
             method.delegate(session, sessionDelegate);
             break;
-        case L4:
-            method.delegate(session, sessionDelegate);
-            break;
         default:
             throw new IllegalStateException
                 ("unknown track: " + method.getEncodedTrack());
         }
+    }
+
+    public void command(Void v, Method method)
+    {
+        method.delegate(session, sessionDelegate);
     }
 
     public void header(Void v, Header header)

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java Wed Apr 16 06:32:13 2008
@@ -32,15 +32,14 @@
 class ChannelDelegate extends MethodDelegate<Channel>
 {
 
-    public @Override void sessionOpen(Channel channel, SessionOpen open)
+    public @Override void sessionAttach(Channel channel, SessionAttach atch)
     {
-        Session ssn = new Session();
+        Session ssn = new Session(atch.getName());
         ssn.attach(channel);
-        long lifetime = open.getDetachedLifetime();
-        ssn.sessionAttached(UUID.randomUUID(), lifetime);
+        ssn.sessionAttached(ssn.getName());
     }
 
-    public @Override void sessionClosed(Channel channel, SessionClosed closed)
+    public @Override void sessionDetached(Channel channel, SessionDetached closed)
     {
         channel.getSession().closed();
         // XXX: should we remove the channel from the connection? It

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java?rev=648692&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java Wed Apr 16 06:32:13 2008
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+
+/**
+ * ClientDelegate
+ *
+ */
+
+public abstract class ClientDelegate extends ConnectionDelegate
+{
+
+    public void init(Channel ch, ProtocolHeader hdr)
+    {
+        if (hdr.getMajor() != TransportConstants.getVersionMajor() &&
+            hdr.getMinor() != TransportConstants.getVersionMinor())
+        {
+            throw new RuntimeException("version missmatch: " + hdr);
+        }
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java Wed Apr 16 06:32:13 2008
@@ -26,7 +26,10 @@
 import org.apache.qpidity.QpidException;
 
 import java.io.UnsupportedEncodingException;
+
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -79,17 +82,27 @@
 
     public void init(Channel ch, ProtocolHeader hdr)
     {
-        // XXX: hardcoded version
-        if (hdr.getMajor() != 0 && hdr.getMinor() != 10)
+        ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader
+                                                    (1,
+                                                     TransportConstants.getVersionMajor(),
+                                                     TransportConstants.getVersionMinor())));
+        if (hdr.getMajor() != TransportConstants.getVersionMajor() &&
+            hdr.getMinor() != TransportConstants.getVersionMinor())
         {
             // XXX
-            ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.getVersionMajor(),
-                    TransportConstants.getVersionMinor())));
+            ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader
+                                                        (1,
+                                                         TransportConstants.getVersionMajor(),
+                                                         TransportConstants.getVersionMinor())));
             ch.getConnection().close();
         }
         else
         {
-            ch.connectionStart(hdr.getMajor(), hdr.getMinor(), null, "PLAIN", "utf8");
+            List<Object> plain = new ArrayList<Object>();
+            plain.add("PLAIN");
+            List<Object> utf8 = new ArrayList<Object>();
+            utf8.add("utf8");
+            ch.connectionStart(null, plain, utf8);
         }
     }
 
@@ -99,13 +112,13 @@
     @Override public void connectionStart(Channel context, ConnectionStart struct)
     {
         String mechanism = null;
-        String response = null;
+        byte[] response = null;
         try
         {
             mechanism = SecurityHelper.chooseMechanism(struct.getMechanisms());
             saslClient = Sasl.createSaslClient(new String[]{ mechanism },null, "AMQP", "localhost", null,
                                                   SecurityHelper.createCallbackHandler(mechanism,_username,_password ));
-            response = new String(saslClient.evaluateChallenge(new byte[0]),_locale);
+            response = saslClient.evaluateChallenge(new byte[0]);
         }
         catch (UnsupportedEncodingException e)
         {
@@ -128,13 +141,9 @@
     {
         try
         {
-            String response = new String(saslClient.evaluateChallenge(struct.getChallenge().getBytes()),_locale);
+            byte[] response = saslClient.evaluateChallenge(struct.getChallenge());
             context.connectionSecureOk(response);
         }
-        catch (UnsupportedEncodingException e)
-        {
-           // need error handling
-        }
         catch (SaslException e)
         {
           // need error handling
@@ -144,14 +153,14 @@
     @Override public void connectionTune(Channel context, ConnectionTune struct)
     {
         // should update the channel max given by the broker.
-        context.connectionTuneOk(struct.getChannelMax(), struct.getFrameMax(), struct.getHeartbeat());
+        context.connectionTuneOk(struct.getChannelMax(), struct.getMaxFrameSize(), struct.getHeartbeatMax());
         context.connectionOpen(_virtualHost, null, Option.INSIST);
     }
 
 
     @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct)
     {
-        String knownHosts = struct.getKnownHosts();
+        List<Object> knownHosts = struct.getKnownHosts();
         if(_negotiationCompleteLock != null)
         {
             _negotiationCompleteLock.lock();
@@ -187,13 +196,13 @@
             byte[] challenge = null;
             if ( challenge == null)
             {
-                context.connectionTune(Integer.MAX_VALUE,maxFrame, 0);
+                context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE);
             }
             else
             {
                 try
                 {
-                    context.connectionSecure(new String(challenge,_locale));
+                    context.connectionSecure(challenge);
                 }
                 catch(Exception e)
                 {
@@ -218,16 +227,16 @@
         try
         {
             saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",new HashMap(),SecurityHelper.createCallbackHandler(_mechanism,_username,_password));
-            byte[] challenge = saslServer.evaluateResponse(struct.getResponse().getBytes());
+            byte[] challenge = saslServer.evaluateResponse(struct.getResponse());
             if ( challenge == null)
             {
-                context.connectionTune(Integer.MAX_VALUE,maxFrame, 0);
+                context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE);
             }
             else
             {
                 try
                 {
-                    context.connectionSecure(new String(challenge,_locale));
+                    context.connectionSecure(challenge);
                 }
                 catch(Exception e)
                 {
@@ -250,8 +259,9 @@
 
     @Override public void connectionOpen(Channel context, ConnectionOpen struct)
     {
-       String hosts = "amqp:1223243232325";
-       context.connectionOpenOk(hosts);
+        List<Object> hosts = new ArrayList<Object>();
+        hosts.add("amqp:1223243232325");
+        context.connectionOpenOk(hosts);
     }
 
     public String getPassword()

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java Wed Apr 16 06:32:13 2008
@@ -93,7 +93,7 @@
             {
                 str.append(" | ");
             }
-            str.append(str(buf, 20));
+            str.append(str(buf));
         }
         str.append(")");
         return str.toString();

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java Wed Apr 16 06:32:13 2008
@@ -20,6 +20,7 @@
  */
 package org.apache.qpidity.transport;
 
+import org.apache.qpidity.transport.network.Frame;
 
 /**
  * Method
@@ -34,11 +35,12 @@
     {
         // XXX: should generate separate factories for separate
         // namespaces
-        return (Method) Struct.create(type);
+        return (Method) StructFactory.createInstruction(type);
     }
 
     // XXX: command subclass?
     private long id;
+    private boolean sync = false;
 
     public final long getId()
     {
@@ -50,6 +52,16 @@
         this.id = id;
     }
 
+    public final boolean isSync()
+    {
+        return sync;
+    }
+
+    void setSync(boolean value)
+    {
+        this.sync = value;
+    }
+
     public abstract boolean hasPayload();
 
     public abstract byte getEncodedTrack();
@@ -58,7 +70,14 @@
 
     public <C> void delegate(C context, ProtocolDelegate<C> delegate)
     {
-        delegate.method(context, this);
+        if (getEncodedTrack() == Frame.L4)
+        {
+            delegate.command(context, this);
+        }
+        else
+        {
+            delegate.control(context, this);
+        }
     }
 
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java Wed Apr 16 06:32:13 2008
@@ -31,7 +31,9 @@
 
     void init(C context, ProtocolHeader header);
 
-    void method(C context, Method method);
+    void control(C context, Method control);
+
+    void command(C context, Method command);
 
     void header(C context, Header header);
 

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java Wed Apr 16 06:32:13 2008
@@ -98,6 +98,13 @@
         ranges.clear();
     }
 
+    public RangeSet copy()
+    {
+        RangeSet copy = new RangeSet();
+        copy.ranges.addAll(ranges);
+        return copy;
+    }
+
     public String toString()
     {
         StringBuffer str = new StringBuffer();

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java Wed Apr 16 06:32:13 2008
@@ -32,6 +32,7 @@
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.apache.qpidity.transport.Option.*;
 
 /**
  * Session
@@ -56,23 +57,34 @@
     private static boolean ENABLE_REPLAY = false;
     private static final Logger log = Logger.get(Session.class);
 
+    private byte[] name;
+    private long timeout = 60000;
+
     // channel may be null
     Channel channel;
 
     // incoming command count
-    private long commandsIn = 0;
+    long commandsIn = 0;
     // completed incoming commands
     private final RangeSet processed = new RangeSet();
-    private long processedMark = -1;
     private Range syncPoint = null;
 
     // outgoing command count
     private long commandsOut = 0;
     private Map<Long,Method> commands = new HashMap<Long,Method>();
-    private long mark = 0;
+    private long maxComplete = -1;
 
     private AtomicBoolean closed = new AtomicBoolean(false);
 
+    public Session(byte[] name)
+    {
+        this.name = name;
+    }
+
+    public byte[] getName()
+    {
+        return name;
+    }
 
     public Map<Long,Method> getOutstandingCommands()
     {
@@ -133,25 +145,12 @@
 
    public void flushProcessed()
     {
-        boolean first = true;
-        RangeSet rest = new RangeSet();
+        RangeSet copy;
         synchronized (processed)
         {
-            for (Range r: processed)
-            {
-                if (first && r.includes(processedMark))
-                {
-                    processedMark = r.getUpper();
-                }
-                else
-                {
-                    rest.add(r);
-                }
-
-                first = false;
-            }
+            copy = processed.copy();
         }
-        executionComplete(processedMark, rest);
+        sessionCompleted(copy);
     }
 
     void syncPoint()
@@ -193,34 +192,34 @@
         log.debug("%s complete(%d, %d)", this, lower, upper);
         synchronized (commands)
         {
-            for (long id = lower; id <= upper; id++)
+            for (long id = maxComplete; id <= upper; id++)
             {
                 commands.remove(id);
             }
+            if (lower <= maxComplete + 1)
+            {
+                maxComplete = Math.max(maxComplete, upper);
+            }
             commands.notifyAll();
             log.debug("%s   commands remaining: %s", this, commands);
         }
     }
 
-    void complete(long mark)
-    {
-        synchronized (commands)
-        {
-            complete(this.mark, mark);
-            this.mark = mark;
-            commands.notifyAll();
-        }
-    }
-
     protected void invoke(Method m)
     {
         if (m.getEncodedTrack() == Frame.L4)
         {
             synchronized (commands)
             {
-                // You only need to keep the command if you need command level replay.
-                // If not we only need to keep track of commands to make sync work
-                commands.put(commandsOut++,(ENABLE_REPLAY?m:null));
+                long next = commandsOut++;
+                if (next == 0)
+                {
+                    sessionCommandPoint(0, 0);
+                }
+                if (ENABLE_REPLAY)
+                {
+                    commands.put(next, m);
+                }
                 channel.method(m);
             }
         }
@@ -230,7 +229,7 @@
         }
     }
 
-     public void header(Header header)
+    public void header(Header header)
     {
         channel.header(header);
     }
@@ -269,21 +268,32 @@
 
     public void sync()
     {
+        sync(timeout);
+    }
+
+    public void sync(long timeout)
+    {
         log.debug("%s sync()", this);
         synchronized (commands)
         {
             long point = commandsOut - 1;
 
-            if (mark < point)
+            if (maxComplete < point)
             {
-                executionSync();
+                ExecutionSync sync = new ExecutionSync();
+                sync.setSync(true);
+                invoke(sync);
             }
 
-            while (!closed.get() && mark < point)
+            long start = System.currentTimeMillis();
+            long elapsed = 0;
+            while (!closed.get() && elapsed < timeout && maxComplete < point)
             {
                 try {
-                    log.debug("%s   waiting for[%d]: %s", this, point, commands);
-                    commands.wait();
+                    log.debug("%s   waiting for[%d]: %d, %s", this, point,
+                              maxComplete, commands);
+                    commands.wait(timeout - elapsed);
+                    elapsed = System.currentTimeMillis() - start;
                 }
                 catch (InterruptedException e)
                 {
@@ -291,9 +301,16 @@
                 }
             }
 
-            if (mark < point)
+            if (maxComplete < point)
             {
-                throw new RuntimeException("session closed");
+                if (closed.get())
+                {
+                    throw new RuntimeException("session closed");
+                }
+                else
+                {
+                    throw new RuntimeException("timed out waiting for sync");
+                }
             }
         }
     }
@@ -346,16 +363,19 @@
             }
         }
 
-        public T get(long timeout, int nanos)
+        public T get(long timeout)
         {
             synchronized (this)
             {
-                while (!closed.get() && !isDone())
+                long start = System.currentTimeMillis();
+                long elapsed = 0;
+                while (!closed.get() && timeout - elapsed > 0 && !isDone())
                 {
                     try
                     {
                         log.debug("%s waiting for result: %s", Session.this, this);
-                        wait(timeout, nanos);
+                        wait(timeout - elapsed);
+                        elapsed = System.currentTimeMillis() - start;
                     }
                     catch (InterruptedException e)
                     {
@@ -364,22 +384,23 @@
                 }
             }
 
-            if (!isDone())
+            if (isDone())
+            {
+                return result;
+            }
+            else if (closed.get())
             {
                 throw new RuntimeException("session closed");
             }
-
-            return result;
-        }
-
-        public T get(long timeout)
-        {
-            return get(timeout, 0);
+            else
+            {
+                return null;
+            }
         }
 
         public T get()
         {
-            return get(0);
+            return get(timeout);
         }
 
         public boolean isDone()
@@ -396,7 +417,8 @@
 
     public void close()
     {
-        sessionClose();
+        sessionRequestTimeout(0);
+        sessionDetach(name);
         // XXX: channel.close();
     }
 

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java Wed Apr 16 06:32:13 2008
@@ -35,20 +35,16 @@
 {
     public void init(Session ssn, ProtocolHeader hdr) { }
 
-    public void method(Session ssn, Method method) {
-        if (method.getEncodedTrack() == Frame.L4)
-        {
-            method.setId(ssn.nextCommandId());
-        }
-
+    public void control(Session ssn, Method method) {
         method.dispatch(ssn, this);
+    }
 
-        if (method.getEncodedTrack() == Frame.L4)
+    public void command(Session ssn, Method method) {
+        method.setId(ssn.nextCommandId());
+        method.dispatch(ssn, this);
+        if (!method.hasPayload())
         {
-            if (!method.hasPayload())
-            {
-                ssn.processed(method);
-            }
+            ssn.processed(method);
         }
     }
 
@@ -60,12 +56,12 @@
 
     @Override public void executionResult(Session ssn, ExecutionResult result)
     {
-        ssn.result(result.getCommandId(), result.getData());
+        ssn.result(result.getCommandId(), result.getValue());
     }
 
-    @Override public void executionComplete(Session ssn, ExecutionComplete excmp)
+    @Override public void sessionCompleted(Session ssn, SessionCompleted cmp)
     {
-        RangeSet ranges = excmp.getRangedExecutionSet();
+        RangeSet ranges = cmp.getCommands();
         if (ranges != null)
         {
             for (Range range : ranges)
@@ -73,12 +69,27 @@
                 ssn.complete(range.getLower(), range.getUpper());
             }
         }
-        ssn.complete(excmp.getCumulativeExecutionMark());
     }
 
-    @Override public void executionFlush(Session ssn, ExecutionFlush flush)
+    @Override public void sessionFlush(Session ssn, SessionFlush flush)
+    {
+        if (flush.getCompleted())
+        {
+            ssn.flushProcessed();
+        }
+        if (flush.getConfirmed())
+        {
+            throw new Error("not implemented");
+        }
+        if (flush.getExpected())
+        {
+            throw new Error("not implemented");
+        }
+    }
+
+    @Override public void sessionCommandPoint(Session ssn, SessionCommandPoint scp)
     {
-        ssn.flushProcessed();
+        ssn.commandsIn = scp.getCommandId();
     }
 
     @Override public void executionSync(Session ssn, ExecutionSync sync)

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java Wed Apr 16 06:32:13 2008
@@ -71,8 +71,6 @@
         return type;
     }
 
-    public abstract boolean hasTicket();
-
     private final boolean isBit(Field<?,?> f)
     {
         return f.getType().equals(Boolean.class);
@@ -80,14 +78,7 @@
 
     private final boolean packed()
     {
-        if (this instanceof Method)
-        {
-            return false;
-        }
-        else
-        {
-            return true;
-        }
+        return getPackWidth() > 0;
     }
 
     private final boolean encoded(Field<?,?> f)
@@ -147,11 +138,6 @@
             }
         }
 
-        if (hasTicket())
-        {
-            dec.readShort();
-        }
-
         for (Field<?,?> f : fields)
         {
             if (encoded(f))
@@ -185,11 +171,6 @@
             {
                 enc.writeBit(false);
             }
-        }
-
-        if (hasTicket())
-        {
-            enc.writeShort(0x0);
         }
 
         for (Field<?,?> f : fields)

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java?rev=648692&r1=648691&r2=648692&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java Wed Apr 16 06:32:13 2008
@@ -2,8 +2,9 @@
 
 public class TransportConstants
 {
-    private static byte _protocol_version_minor = 0;
-    private static byte _protocol_version_major = 99;
+
+    private static byte _protocol_version_minor = 10;
+    private static byte _protocol_version_major = 0;
 
     public static void setVersionMajor(byte value)
     {
@@ -24,4 +25,5 @@
     {
         return _protocol_version_minor;
     }
+
 }



Mime
View raw message