directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kayyag...@apache.org
Subject svn commit: r904176 - /directory/clients/ldap/trunk/ldap-client-api/src/main/java/org/apache/directory/ldap/client/api/LdapConnection.java
Date Thu, 28 Jan 2010 17:28:24 GMT
Author: kayyagari
Date: Thu Jan 28 17:28:23 2010
New Revision: 904176

URL: http://svn.apache.org/viewvc?rev=904176&view=rev
Log:
o replaced global response queues with per operation response queues

Modified:
    directory/clients/ldap/trunk/ldap-client-api/src/main/java/org/apache/directory/ldap/client/api/LdapConnection.java

Modified: directory/clients/ldap/trunk/ldap-client-api/src/main/java/org/apache/directory/ldap/client/api/LdapConnection.java
URL: http://svn.apache.org/viewvc/directory/clients/ldap/trunk/ldap-client-api/src/main/java/org/apache/directory/ldap/client/api/LdapConnection.java?rev=904176&r1=904175&r2=904176&view=diff
==============================================================================
--- directory/clients/ldap/trunk/ldap-client-api/src/main/java/org/apache/directory/ldap/client/api/LdapConnection.java
(original)
+++ directory/clients/ldap/trunk/ldap-client-api/src/main/java/org/apache/directory/ldap/client/api/LdapConnection.java
Thu Jan 28 17:28:23 2010
@@ -185,38 +185,14 @@
     /** A Message ID which is incremented for each operation */
     private AtomicInteger messageId;
     
-    /** A queue used to store the incoming add responses */
-    private BlockingQueue<AddResponse> addResponseQueue;
-    
-    /** A queue used to store the incoming bind responses */
-    private BlockingQueue<BindResponse> bindResponseQueue;
-    
-    /** A queue used to store the incoming compare responses */
-    private BlockingQueue<CompareResponse> compareResponseQueue;
-    
-    /** A queue used to store the incoming delete responses */
-    private BlockingQueue<DeleteResponse> deleteResponseQueue;
-    
-    /** A queue used to store the incoming extended responses */
-    private BlockingQueue<ExtendedResponse> extendedResponseQueue;
-    
-    /** A queue used to store the incoming modify responses */
-    private BlockingQueue<ModifyResponse> modifyResponseQueue;
-    
-    /** A queue used to store the incoming modifyDN responses */
-    private BlockingQueue<ModifyDnResponse> modifyDNResponseQueue;
-    
-    /** A queue used to store the incoming search responses */
-    private BlockingQueue<SearchResponse> searchResponseQueue;
-    
-    /** A queue used to store the incoming intermediate responses */
-    private BlockingQueue<IntermediateResponse> intermediateResponseQueue;
-
     /** a map to hold the response listeners based on the operation id */
     private Map<Integer, OperationResponseListener> listenerMap = new ConcurrentHashMap<Integer,
OperationResponseListener>();
     
     /** a map to hold the ResponseFutures for all operations */
     private Map<Integer, ResponseFuture> futureMap = new ConcurrentHashMap<Integer,
ResponseFuture>();
+
+    /** a map to hold the response queues related to the operations */
+    private Map<Integer, BlockingQueue> respQueueMap = new ConcurrentHashMap<Integer,
BlockingQueue>();
     
     /** list of controls supported by the server */
     private List<String> supportedControls;
@@ -628,17 +604,6 @@
         // Store the container into the session 
         ldapSession.setAttribute( "LDAP-Container", ldapMessageContainer );
         
-        // Create the responses queues
-        addResponseQueue = new LinkedBlockingQueue<AddResponse>();
-        bindResponseQueue = new LinkedBlockingQueue<BindResponse>();
-        compareResponseQueue = new LinkedBlockingQueue<CompareResponse>();
-        deleteResponseQueue = new LinkedBlockingQueue<DeleteResponse>();
-        extendedResponseQueue = new LinkedBlockingQueue<ExtendedResponse>();
-        modifyResponseQueue = new LinkedBlockingQueue<ModifyResponse>();
-        modifyDNResponseQueue = new LinkedBlockingQueue<ModifyDnResponse>();
-        searchResponseQueue = new LinkedBlockingQueue<SearchResponse>();
-        intermediateResponseQueue = new LinkedBlockingQueue<IntermediateResponse>();
-        
         // And return
         return true;
     }
@@ -658,17 +623,6 @@
             ldapSession.close( true );
         }
         
-        // clean the queues
-        addResponseQueue.clear();
-        bindResponseQueue.clear();
-        compareResponseQueue.clear();
-        deleteResponseQueue.clear();
-        extendedResponseQueue.clear();
-        modifyResponseQueue.clear();
-        modifyDNResponseQueue.clear();
-        searchResponseQueue.clear();
-        intermediateResponseQueue.clear();
-        
         // And close the connector if it has been created locally
         if ( localConnector ) 
         {
@@ -739,6 +693,9 @@
         
         message.setProtocolOP( addReqCodec );
         
+        BlockingQueue<AddResponse> addResponseQueue = new LinkedBlockingQueue<AddResponse>();
+        respQueueMap.put( newId, addResponseQueue );
+        
         ResponseFuture addFuture = new ResponseFuture( addResponseQueue );
         futureMap.put( newId, addFuture );
 
@@ -767,7 +724,7 @@
             catch( Exception e )
             {
                 LOG.error( NO_RESPONSE_ERROR );
-                futureMap.remove( newId );
+                removeFromMaps( newId );
                 throw new LdapException( NO_RESPONSE_ERROR, e );
             }
         }
@@ -861,6 +818,7 @@
         // remove the associated listener if any 
         int abandonId = abandonRequest.getAbandonedMessageId();
 
+        respQueueMap.remove( abandonId );
         ResponseFuture rf = futureMap.remove( abandonId );
         OperationResponseListener listener = listenerMap.remove( abandonId );
 
@@ -1181,6 +1139,9 @@
         LOG.debug( "-----------------------------------------------------------------" );
         LOG.debug( "Sending request \n{}", bindMessage );
 
+        BlockingQueue<BindResponse> bindResponseQueue = new LinkedBlockingQueue<BindResponse>();
+        respQueueMap.put( newId, bindResponseQueue );
+        
         // Create a future for this Bind opeation
         BindFuture bindFuture = new BindFuture( bindResponseQueue );
 
@@ -1361,6 +1322,9 @@
         LOG.debug( "-----------------------------------------------------------------" );
         LOG.debug( "Sending request \n{}", searchMessage );
     
+        BlockingQueue<SearchResponse> searchResponseQueue = new LinkedBlockingQueue<SearchResponse>();
+        respQueueMap.put( newId, searchResponseQueue );
+        
         ResponseFuture searchFuture = new ResponseFuture( searchResponseQueue );
         futureMap.put( newId, searchFuture );
 
@@ -1527,6 +1491,7 @@
         }
         
         SearchListener searchListener = null;
+        int messageId = response.getMessageId();
 
         switch ( response.getMessageType() )
         {
@@ -1534,7 +1499,7 @@
                 // Store the response into the responseQueue
                 AddResponseCodec addRespCodec = response.getAddResponse();
                 addRespCodec.addControl( response.getCurrentControl() );
-                addRespCodec.setMessageId( response.getMessageId() );
+                addRespCodec.setMessageId( messageId );
                 
                 futureMap.remove( addRespCodec.getMessageId() );
                 AddListener addListener = ( AddListener ) listenerMap.remove( addRespCodec.getMessageId()
);
@@ -1545,14 +1510,14 @@
                 }
                 else
                 {
-                    addResponseQueue.add( addResp ); 
+                    addToRespQueueAndRemoveQueueRef( messageId, addResp ); 
                 }
                 break;
                 
             case LdapConstants.BIND_RESPONSE: 
                 // Store the response into the responseQueue
                 BindResponseCodec bindResponseCodec = response.getBindResponse();
-                bindResponseCodec.setMessageId( response.getMessageId() );
+                bindResponseCodec.setMessageId( messageId );
                 bindResponseCodec.addControl( response.getCurrentControl() );
                 BindResponse bindResponse = convert( bindResponseCodec );
 
@@ -1568,7 +1533,7 @@
                 else
                 {
                     // Store the response into the responseQueue
-                    bindResponseQueue.add( bindResponse );
+                    addToRespQueueAndRemoveQueueRef( messageId, bindResponse );
                 }
                 
                 break;
@@ -1576,7 +1541,7 @@
             case LdapConstants.COMPARE_RESPONSE :
                 // Store the response into the responseQueue
                 CompareResponseCodec compResCodec = response.getCompareResponse();
-                compResCodec.setMessageId( response.getMessageId() );
+                compResCodec.setMessageId( messageId );
                 compResCodec.addControl( response.getCurrentControl() );
                 CompareResponse compRes = convert( compResCodec );
                 
@@ -1589,7 +1554,7 @@
                 }
                 else
                 {
-                    compareResponseQueue.add( compRes ); 
+                    addToRespQueueAndRemoveQueueRef( messageId, compRes ); 
                 }
                 
                 break;
@@ -1597,7 +1562,7 @@
             case LdapConstants.DEL_RESPONSE :
                 // Store the response into the responseQueue
                 DelResponseCodec delRespCodec = response.getDelResponse();
-                delRespCodec.setMessageId( response.getMessageId() );
+                delRespCodec.setMessageId( messageId );
                 delRespCodec.addControl( response.getCurrentControl() );
                 DeleteResponse delResp = convert( delRespCodec );
                 
@@ -1609,13 +1574,13 @@
                 }
                 else
                 {
-                    deleteResponseQueue.add( delResp ); 
+                    addToRespQueueAndRemoveQueueRef( messageId, delResp ); 
                 }
                 break;
                 
             case LdapConstants.EXTENDED_RESPONSE :
                 ExtendedResponseCodec extResCodec = response.getExtendedResponse();
-                extResCodec.setMessageId( response.getMessageId() );
+                extResCodec.setMessageId( messageId );
                 extResCodec.addControl( response.getCurrentControl() );
                 
                 ExtendedResponse extResponse = convert( extResCodec );
@@ -1628,16 +1593,17 @@
                 else
                 {
                     // Store the response into the responseQueue
-                    extendedResponseQueue.add( extResponse ); 
+                    addToRespQueueAndRemoveQueueRef( messageId, extResponse ); 
                 }
                 
                 break;
                 
+                //FIXME the way we handle the intermediate responses is broken
             case LdapConstants.INTERMEDIATE_RESPONSE:
                 // Store the response into the responseQueue
                 IntermediateResponseCodec intermediateResponseCodec = 
                     response.getIntermediateResponse();
-                intermediateResponseCodec.setMessageId( response.getMessageId() );
+                intermediateResponseCodec.setMessageId( messageId );
                 intermediateResponseCodec.addControl( response.getCurrentControl() );
                 
                 IntermediateResponse intrmResp = convert( intermediateResponseCodec );  
             
@@ -1649,14 +1615,14 @@
                 else
                 {
                     // Store the response into the responseQueue
-                    intermediateResponseQueue.add( intrmResp );
+                    addToRespQueueAndRemoveQueueRef( messageId, intrmResp );
                 }
                 
                 break;
      
             case LdapConstants.MODIFY_RESPONSE :
                 ModifyResponseCodec modRespCodec = response.getModifyResponse();
-                modRespCodec.setMessageId( response.getMessageId() );
+                modRespCodec.setMessageId( messageId );
                 modRespCodec.addControl( response.getCurrentControl() );
                 ModifyResponse modResp = convert( modRespCodec );
 
@@ -1669,7 +1635,7 @@
                 }
                 else
                 {
-                    modifyResponseQueue.add( modResp ); 
+                    addToRespQueueAndRemoveQueueRef( messageId, modResp ); 
                 }
                 break;
                 
@@ -1677,7 +1643,7 @@
                 
                 ModifyDNResponseCodec modDnCodec = response.getModifyDNResponse();
                 modDnCodec.addControl( response.getCurrentControl() );
-                modDnCodec.setMessageId( response.getMessageId() );
+                modDnCodec.setMessageId( messageId );
                 ModifyDnResponse modDnResp = convert( modDnCodec );
                 
                 futureMap.remove( modDnCodec.getMessageId() );
@@ -1689,7 +1655,7 @@
                 else
                 {
                     // Store the response into the responseQueue
-                    modifyDNResponseQueue.add( modDnResp );
+                    addToRespQueueAndRemoveQueueRef( messageId, modDnResp );
                 }
                 break;
                 
@@ -1697,7 +1663,7 @@
                 // Store the response into the responseQueue
                 SearchResultDoneCodec searchResultDoneCodec = 
                     response.getSearchResultDone();
-                searchResultDoneCodec.setMessageId( response.getMessageId() );
+                searchResultDoneCodec.setMessageId( messageId );
                 searchResultDoneCodec.addControl( response.getCurrentControl() );
                 SearchResultDone srchDone = convert( searchResultDoneCodec );
                 
@@ -1710,7 +1676,7 @@
                 }
                 else
                 {
-                    searchResponseQueue.add( srchDone );
+                    addToRespQueueAndRemoveQueueRef( messageId, srchDone );
                 }
                 
                 break;
@@ -1719,7 +1685,7 @@
                 // Store the response into the responseQueue
                 SearchResultEntryCodec searchResultEntryCodec = 
                     response.getSearchResultEntry();
-                searchResultEntryCodec.setMessageId( response.getMessageId() );
+                searchResultEntryCodec.setMessageId( messageId );
                 searchResultEntryCodec.addControl( response.getCurrentControl() );
                 
                 SearchResultEntry srchEntry = convert( searchResultEntryCodec );
@@ -1731,7 +1697,8 @@
                 }
                 else
                 {
-                    searchResponseQueue.add( srchEntry );
+                    // shouldn't call addToRespQueueAndRemoveQueueRef
+                    respQueueMap.get( messageId ).add( srchEntry );
                 }
                 
                 break;
@@ -1740,7 +1707,7 @@
                 // Store the response into the responseQueue
                 SearchResultReferenceCodec searchResultReferenceCodec = 
                     response.getSearchResultReference();
-                searchResultReferenceCodec.setMessageId( response.getMessageId() );
+                searchResultReferenceCodec.setMessageId( messageId );
                 searchResultReferenceCodec.addControl( response.getCurrentControl() );
 
                 SearchResultReference srchRef = convert( searchResultReferenceCodec );
@@ -1751,7 +1718,8 @@
                 }
                 else
                 {
-                    searchResponseQueue.add( srchRef );
+                    // shouldn't call addToRespQueueAndRemoveQueueRef
+                    respQueueMap.get( messageId ).add( srchRef );
                 }
 
                 break;
@@ -1816,6 +1784,9 @@
         modifyMessage.setProtocolOP( modReqCodec );
         setControls( modRequest.getControls(), modifyMessage );
 
+        BlockingQueue<ModifyResponse> modifyResponseQueue = new LinkedBlockingQueue<ModifyResponse>();
+        respQueueMap.put( newId, modifyResponseQueue );
+        
         ResponseFuture modifyFuture = new ResponseFuture( modifyResponseQueue );
         futureMap.put( newId, modifyFuture );
         
@@ -1844,7 +1815,7 @@
             catch( Exception e )
             {
                 LOG.error( NO_RESPONSE_ERROR );
-                futureMap.remove( newId );
+                removeFromMaps( newId );
 
                 throw new LdapException( NO_RESPONSE_ERROR, e );
             }
@@ -1997,6 +1968,9 @@
         modifyDnMessage.setProtocolOP( modDnCodec );
         setControls( modDnRequest.getControls(), modifyDnMessage );
         
+        BlockingQueue<ModifyDnResponse> modifyDNResponseQueue = new LinkedBlockingQueue<ModifyDnResponse>();
+        respQueueMap.put( newId, modifyDNResponseQueue );
+        
         ResponseFuture modifyDNFuture = new ResponseFuture( modifyDNResponseQueue );
         futureMap.put( newId, modifyDNFuture );
         
@@ -2025,7 +1999,7 @@
             catch( Exception e )
             {
                 LOG.error( NO_RESPONSE_ERROR );
-                futureMap.remove( newId );
+                removeFromMaps( newId );
 
                 LdapException ldapException = new LdapException( NO_RESPONSE_ERROR );
                 ldapException.initCause( e );
@@ -2278,6 +2252,9 @@
         deleteMessage.setProtocolOP( delCodec );
         setControls( delRequest.getControls(), deleteMessage );
         
+        BlockingQueue<DeleteResponse> deleteResponseQueue = new LinkedBlockingQueue<DeleteResponse>();
+        respQueueMap.put( newId, deleteResponseQueue );
+        
         ResponseFuture deleteFuture = new ResponseFuture( deleteResponseQueue );
         futureMap.put( newId, deleteFuture );
 
@@ -2307,7 +2284,7 @@
             catch( Exception e )
             {
                 LOG.error( NO_RESPONSE_ERROR );
-                futureMap.remove( newId );
+                removeFromMaps( newId );
 
                 LdapException ldapException = new LdapException( NO_RESPONSE_ERROR );
                 ldapException.initCause( e );
@@ -2512,6 +2489,9 @@
         
         message.setProtocolOP( compareReqCodec );
         
+        BlockingQueue<CompareResponse> compareResponseQueue = new LinkedBlockingQueue<CompareResponse>();
+        respQueueMap.put( newId, compareResponseQueue );
+        
         ResponseFuture compareFuture = new ResponseFuture( compareResponseQueue );
         futureMap.put( newId, compareFuture );
 
@@ -2541,7 +2521,7 @@
             catch( Exception e )
             {
                 LOG.error( NO_RESPONSE_ERROR );
-                futureMap.remove( newId );
+                removeFromMaps( newId );
 
                 throw new LdapException( NO_RESPONSE_ERROR, e );
             }
@@ -2670,6 +2650,9 @@
         
         message.setProtocolOP( extReqCodec );
         
+        BlockingQueue<ExtendedResponse> extendedResponseQueue = new LinkedBlockingQueue<ExtendedResponse>();
+        respQueueMap.put( newId, extendedResponseQueue );
+        
         ResponseFuture extendedFuture = new ResponseFuture( extendedResponseQueue );
         futureMap.put( newId, extendedFuture );
 
@@ -2699,7 +2682,7 @@
             catch( Exception e )
             {
                 LOG.error( NO_RESPONSE_ERROR );
-                futureMap.remove( newId );
+                removeFromMaps( newId );
 
                 throw new LdapException( NO_RESPONSE_ERROR, e );
             }
@@ -2839,15 +2822,44 @@
     private void addControls( LdapMessageCodec codec, AbstractMessage message )
     {
         List<ControlCodec> ccList = codec.getControls();
-     
         if( ccList != null )
         {
             for( ControlCodec cc : ccList )
             {
                 // FIXME this is causing the exceptions
-                //Control control = new ControlImpl( cc.getControlType(), cc.getEncodedValue(),
cc.getCriticality() );
-                //message.add( control );
+//                Control control = new ControlImpl( cc.getControlType() );
+//                control.setValue( cc.getEncodedValue() );
+//                control.setCritical( cc.getCriticality() );
+//                
+//                message.add( control );
             }
         }
     }
+    
+
+    /**
+     * removes the Objects associated with the given message ID
+     * from future and response queue maps
+     * 
+     * @param msgId id of the message 
+     */
+    private void removeFromMaps( int msgId )
+    {
+        futureMap.remove( msgId );
+        respQueueMap.remove( msgId );
+    }
+    
+    
+    /**
+     * removes the blocking queue present in the
+     * respQueueMap and adds the given message to the blocking queue
+     *  
+     * @param msgId id of the message
+     * @param message the received response message to be added
+     */
+    private void addToRespQueueAndRemoveQueueRef( int msgId, AbstractMessage message )
+    {
+        BlockingQueue respQueue = respQueueMap.remove( msgId );
+        respQueue.add( message );
+    }
 }



Mime
View raw message