qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From raj...@apache.org
Subject svn commit: r1144531 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ common/src/main/java/org/apache/qpid/configuration/ common/src/main/java/org/apache/qpid/transport/
Date Fri, 08 Jul 2011 22:45:10 GMT
Author: rajith
Date: Fri Jul  8 22:45:09 2011
New Revision: 1144531

URL: http://svn.apache.org/viewvc?rev=1144531&view=rev
Log:
QPID-3269
In order to verify the uniqueness of the client ID, a dummy session is
created using client ID as it's name. This prevents any other connection
from using same client ID as the session creation will fail. However
this verification is switched off by default in order to preserve
backwards compatibility. You need to use -Dqpid.verify_client_id=true
switch verification on.

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1144531&r1=1144530&r2=1144531&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Fri
Jul  8 22:45:09 2011
@@ -173,8 +173,8 @@ public class AMQConnection extends Close
     //Indicates the sync publish options (persistent|all)
     //By default it's async publish
     private String _syncPublish = "";
-    
-    // Indicates whether to use the old map message format or the 
+
+    // Indicates whether to use the old map message format or the
     // new amqp-0-10 encoded format.
     private boolean _useLegacyMapMessageFormat;
 
@@ -261,7 +261,7 @@ public class AMQConnection extends Close
         {
             throw new IllegalArgumentException("Connection must be specified");
         }
-        
+
         // set this connection maxPrefetch
         if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null)
         {
@@ -311,7 +311,7 @@ public class AMQConnection extends Close
             // use the default value set for all connections
             _syncPublish = System.getProperty((ClientProperties.SYNC_PUBLISH_PROP_NAME),_syncPublish);
         }
-        
+
         if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT)
!= null)
         {
             _useLegacyMapMessageFormat =  Boolean.parseBoolean(
@@ -322,16 +322,16 @@ public class AMQConnection extends Close
             // use the default value set for all connections
             _useLegacyMapMessageFormat = Boolean.getBoolean(ClientProperties.USE_LEGACY_MAP_MESSAGE_FORMAT);
         }
-        
+
         String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10");
         _logger.debug("AMQP version " + amqpVersion);
-        
+
         _failoverPolicy = new FailoverPolicy(connectionURL, this);
         BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails();
-        if ("0-8".equals(amqpVersion)) 
+        if ("0-8".equals(amqpVersion))
         {
             _delegate = new AMQConnectionDelegate_8_0(this);
-        } 
+        }
         else if ("0-9".equals(amqpVersion))
         {
             _delegate = new AMQConnectionDelegate_0_9(this);
@@ -418,6 +418,7 @@ public class AMQConnection extends Close
                 brokerDetails = _failoverPolicy.getNextBrokerDetails();
             }
         }
+        verifyClientID();
 
         if (_logger.isDebugEnabled())
         {
@@ -504,7 +505,7 @@ public class AMQConnection extends Close
             Class partypes[] = new Class[1];
             partypes[0] = AMQConnection.class;
             _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
-            //Update our session to use this new protocol version 
+            //Update our session to use this new protocol version
             _protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion());
 
         }
@@ -1074,7 +1075,7 @@ public class AMQConnection extends Close
     {
         _username = id;
     }
-    
+
     public String getPassword()
     {
         return _password;
@@ -1250,7 +1251,7 @@ public class AMQConnection extends Close
             {
                 je.setLinkedException((Exception) cause);
             }
-            
+
             je.initCause(cause);
         }
 
@@ -1283,7 +1284,7 @@ public class AMQConnection extends Close
             {
                 _logger.info("Not a hard-error connection not closing: " + cause);
             }
-            
+
             // deliver the exception if there is a listener
             if (_exceptionListener != null)
             {
@@ -1293,7 +1294,7 @@ public class AMQConnection extends Close
             {
                 _logger.error("Throwable Received but no listener set: " + cause);
             }
-    
+
             // if we are closing the connection, close sessions first
             if (closer)
             {
@@ -1351,17 +1352,17 @@ public class AMQConnection extends Close
     }
 
     /**
-     * Returns connection url. 
+     * Returns connection url.
      * @return connection url
      */
     public ConnectionURL getConnectionURL()
     {
         return _connectionURL;
     }
-    
+
     /**
      * Returns stringified connection url.   This url is suitable only for display
-     * as {@link AMQConnectionURL#toString()} converts any password to asterisks. 
+     * as {@link AMQConnectionURL#toString()} converts any password to asterisks.
      * @return connection url
      */
     public String toURL()
@@ -1477,9 +1478,24 @@ public class AMQConnection extends Close
     {
         return _sessions.getNextChannelId();
     }
-    
+
     public boolean isUseLegacyMapMessageFormat()
     {
         return _useLegacyMapMessageFormat;
     }
+
+    private void verifyClientID() throws AMQException
+    {
+        if (Boolean.getBoolean(ClientProperties.QPID_VERIFY_CLIENT_ID))
+        {
+            try
+            {
+                _delegate.verifyClientID();
+            }
+            catch(JMSException e)
+            {
+                throw new AMQException(AMQConstant.ALREADY_EXISTS,"ClientID must be unique",e);
+            }
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=1144531&r1=1144530&r2=1144531&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
Fri Jul  8 22:45:09 2011
@@ -57,10 +57,12 @@ public interface AMQConnectionDelegate
     void closeConnection(long timeout) throws JMSException, AMQException;
 
     <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E>
operation) throws E;
-    
+
     int getMaxChannelID();
 
     int getMinChannelID();
 
     ProtocolVersion getProtocolVersion();
+
+    void verifyClientID() throws JMSException;
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1144531&r1=1144530&r2=1144531&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
Fri Jul  8 22:45:09 2011
@@ -47,6 +47,7 @@ import org.apache.qpid.transport.Connect
 import org.apache.qpid.transport.ConnectionListener;
 import org.apache.qpid.transport.ConnectionSettings;
 import org.apache.qpid.transport.ProtocolVersionException;
+import org.apache.qpid.transport.SessionDetachCode;
 import org.apache.qpid.transport.TransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,7 +87,14 @@ public class AMQConnectionDelegate_0_10 
     /**
      * create a Session and start it if required.
      */
+
     public Session createSession(boolean transacted, int acknowledgeMode, int prefetchHigh,
int prefetchLow)
+    throws JMSException
+    {
+        return createSession(transacted,acknowledgeMode,prefetchHigh,prefetchLow,null);
+    }
+
+    public Session createSession(boolean transacted, int acknowledgeMode, int prefetchHigh,
int prefetchLow, String name)
             throws JMSException
     {
         _conn.checkNotClosed();
@@ -101,7 +109,7 @@ public class AMQConnectionDelegate_0_10 
         try
         {
             session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted,
acknowledgeMode, prefetchHigh,
-                                          prefetchLow);
+                    prefetchLow,name);
             _conn.registerSession(channelId, session);
             if (_conn._started)
             {
@@ -449,12 +457,31 @@ public class AMQConnectionDelegate_0_10 
         else
         {
             heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT);
-        } 
+        }
         return heartbeat;
     }
-    
+
     protected org.apache.qpid.transport.Connection getQpidConnection()
     {
         return _qpidConnection;
     }
+
+    public void verifyClientID() throws JMSException
+    {
+        int prefetch = (int)_conn.getMaxPrefetch();
+        AMQSession_0_10 ssn = (AMQSession_0_10)createSession(false, 1,prefetch,prefetch,_conn.getClientID());
+        org.apache.qpid.transport.Session ssn_0_10 = ssn.getQpidSession();
+        try
+        {
+            ssn_0_10.awaitOpen();
+        }
+        catch(Exception e)
+        {
+            if (ssn_0_10.getDetachCode() != null &&
+                ssn_0_10.getDetachCode() == SessionDetachCode.SESSION_BUSY)
+            {
+                throw new JMSException("ClientID must be unique");
+            }
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1144531&r1=1144530&r2=1144531&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
Fri Jul  8 22:45:09 2011
@@ -332,4 +332,9 @@ public class AMQConnectionDelegate_8_0 i
     {
         return ProtocolVersion.v8_0;
     }
+
+    public void verifyClientID() throws JMSException
+    {
+        // NOOP
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1144531&r1=1144530&r2=1144531&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Fri Jul  8 22:45:09 2011
@@ -159,13 +159,20 @@ public class AMQSession_0_10 extends AMQ
      */
     AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con,
int channelId,
                     boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry,
-                    int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+                    int defaultPrefetchHighMark, int defaultPrefetchLowMark,String name)
     {
 
         super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark,
               defaultPrefetchLowMark);
         _qpidConnection = qpidConnection;
-        _qpidSession = _qpidConnection.createSession(1);
+        if (name == null)
+        {
+            _qpidSession = _qpidConnection.createSession(1);
+        }
+        else
+        {
+            _qpidSession = _qpidConnection.createSession(name,1);
+        }
         _qpidSession.setSessionListener(this);
         if (_transacted)
         {
@@ -192,11 +199,12 @@ public class AMQSession_0_10 extends AMQ
      * @param qpidConnection      The connection
      */
     AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con,
int channelId,
-                    boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int
defaultPrefetchLow)
+                    boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int
defaultPrefetchLow,
+                    String name)
     {
 
         this(qpidConnection, con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(),
-             defaultPrefetchHigh, defaultPrefetchLow);
+             defaultPrefetchHigh, defaultPrefetchLow,name);
     }
 
     private void addUnacked(int id)

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java?rev=1144531&r1=1144530&r2=1144531&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java Fri
Jul  8 22:45:09 2011
@@ -52,7 +52,7 @@ public class XASessionImpl extends AMQSe
     {
         super(qpidConnection, con, channelId, false,  // this is not a transacted session
               Session.AUTO_ACKNOWLEDGE, // the ack mode is transacted
-              MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
+              MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow,null);
         createSession();
         _xaResource = new XAResourceImpl(this);
     }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java?rev=1144531&r1=1144530&r2=1144531&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
(original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
Fri Jul  8 22:45:09 2011
@@ -23,7 +23,7 @@ package org.apache.qpid.configuration;
  */
 public class ClientProperties
 {
-  
+
     /**
      * Currently with Qpid it is not possible to change the client ID.
      * If one is not specified upon connection construction, an id is generated automatically.
@@ -68,38 +68,40 @@ public class ClientProperties
      * by the broker in TuneOK it will be used as the heartbeat interval.
      * If not a warning will be printed and the max value specified for
      * heartbeat in TuneOK will be used
-     * 
+     *
      * The default idle timeout is set to 120 secs
      */
     public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout";
     public static final long DEFAULT_IDLE_TIMEOUT = 120000;
-    
+
     public static final String HEARTBEAT = "qpid.heartbeat";
     public static final int HEARTBEAT_DEFAULT = 120;
-    
+
     /**
      * This value will be used to determine the default destination syntax type.
      * Currently the two types are Binding URL (java only) and the Addressing format (used
by
-     * all clients). 
+     * all clients).
      */
     public static final String DEST_SYNTAX = "qpid.dest_syntax";
-    
+
     public static final String USE_LEGACY_MAP_MESSAGE_FORMAT = "qpid.use_legacy_map_message";
 
     public static final String AMQP_VERSION = "qpid.amqp.version";
-    
+
+    public static final String QPID_VERIFY_CLIENT_ID = "qpid.verify_client_id";
+
     private static ClientProperties _instance = new ClientProperties();
-    
+
     /*
-    public static final QpidProperty<Boolean>  IGNORE_SET_CLIENTID_PROP_NAME = 
+    public static final QpidProperty<Boolean>  IGNORE_SET_CLIENTID_PROP_NAME =
         QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID");
-    
+
     public static final QpidProperty<Boolean> SYNC_PERSISTENT_PROP_NAME =
         QpidProperty.booleanProperty(false,"qpid.sync_persistence","sync_persistence");
-    
-    
+
+
     public static final QpidProperty<Integer> MAX_PREFETCH_PROP_NAME =
         QpidProperty.intProperty(500,"qpid.max_prefetch","max_prefetch"); */
-    
-    
+
+
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java?rev=1144531&r1=1144530&r2=1144531&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
(original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
Fri Jul  8 22:45:09 2011
@@ -95,6 +95,7 @@ public abstract class ConnectionDelegate
         Session ssn = conn.getSession(dtc.getChannel());
         if (ssn != null)
         {
+            ssn.setDetachCode(dtc.getCode());
             conn.unmap(ssn);
             ssn.closed();
         }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1144531&r1=1144530&r2=1144531&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Fri Jul
 8 22:45:09 2011
@@ -120,7 +120,9 @@ public class Session extends SessionInvo
 
     private Thread resumer = null;
     private boolean transacted = false;
-    
+    private SessionDetachCode detachCode;
+    private final Object stateLock = new Object();
+
     protected Session(Connection connection, Binary name, long expiry)
     {
         this(connection, new SessionDelegate(), name, expiry);
@@ -1045,13 +1047,54 @@ public class Session extends SessionInvo
     {
         return String.format("ssn:%s", name);
     }
-    
+
     public void setTransacted(boolean b) {
         this.transacted = b;
     }
-    
+
     public boolean isTransacted(){
         return transacted;
     }
-    
+
+    public void setDetachCode(SessionDetachCode dtc)
+    {
+        this.detachCode = dtc;
+    }
+
+    public SessionDetachCode getDetachCode()
+    {
+        return this.detachCode;
+    }
+
+    public void awaitOpen()
+    {
+        switch (state)
+        {
+        case NEW:
+            synchronized(stateLock)
+            {
+                Waiter w = new Waiter(stateLock, timeout);
+                while (w.hasTime() && state == NEW)
+                {
+                    w.await();
+                }
+            }
+
+            if (state != OPEN)
+            {
+                throw new SessionException("Timed out waiting for Session to open");
+            }
+        case DETACHED:
+        case CLOSING:
+        case CLOSED:
+            throw new SessionException("Session closed");
+        default :
+            break;
+        }
+    }
+
+    public Object getStateLock()
+    {
+        return stateLock;
+    }
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=1144531&r1=1144530&r2=1144531&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
(original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
Fri Jul  8 22:45:09 2011
@@ -76,6 +76,10 @@ public class SessionDelegate
     @Override public void sessionAttached(Session ssn, SessionAttached atc)
     {
         ssn.setState(Session.State.OPEN);
+        synchronized (ssn.getStateLock())
+        {
+            ssn.getStateLock().notifyAll();
+        }
     }
 
     @Override public void sessionTimeout(Session ssn, SessionTimeout t)
@@ -203,10 +207,18 @@ public class SessionDelegate
     public void closed(Session session)
     {
         log.debug("CLOSED: [%s]", session);
+        synchronized (session.getStateLock())
+        {
+            session.getStateLock().notifyAll();
+        }
     }
 
     public void detached(Session session)
     {
         log.debug("DETACHED: [%s]", session);
+        synchronized (session.getStateLock())
+        {
+            session.getStateLock().notifyAll();
+        }
     }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message