activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r509728 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
Date Tue, 20 Feb 2007 20:05:01 GMT
Author: rajdavies
Date: Tue Feb 20 12:05:00 2007
New Revision: 509728

URL: http://svn.apache.org/viewvc?view=rev&rev=509728
Log:
small op

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java?view=diff&rev=509728&r1=509727&r2=509728
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
Tue Feb 20 12:05:00 2007
@@ -1,125 +1,122 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.transport;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
-
+import java.util.Map;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.util.IntSequenceGenerator;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import java.util.concurrent.ConcurrentHashMap;
 
-
 /**
- * Adds the incrementing sequence number to commands along with performing the corelation
of
- * responses to requests to create a blocking request-response semantics.
+ * Adds the incrementing sequence number to commands along with performing the corelation
of responses to requests to
+ * create a blocking request-response semantics.
  * 
  * @version $Revision: 1.4 $
  */
-public class ResponseCorrelator extends TransportFilter {
-    
-    private static final Log log = LogFactory.getLog(ResponseCorrelator.class);
-    
-    private final ConcurrentHashMap requestMap = new ConcurrentHashMap();
+public class ResponseCorrelator extends TransportFilter{
+
+    private static final Log log=LogFactory.getLog(ResponseCorrelator.class);
+    private final Map requestMap=new HashMap();
     private IntSequenceGenerator sequenceGenerator;
-    private final boolean debug = log.isDebugEnabled();
-    public ResponseCorrelator(Transport next) {
-        this(next, new IntSequenceGenerator());
+    private final boolean debug=log.isDebugEnabled();
+
+    public ResponseCorrelator(Transport next){
+        this(next,new IntSequenceGenerator());
     }
-    
-    public ResponseCorrelator(Transport next, IntSequenceGenerator sequenceGenerator) {
+
+    public ResponseCorrelator(Transport next,IntSequenceGenerator sequenceGenerator){
         super(next);
-        this.sequenceGenerator = sequenceGenerator;
+        this.sequenceGenerator=sequenceGenerator;
     }
 
-    public void oneway(Object o) throws IOException {
-    	Command command = (Command) o;
+    public void oneway(Object o) throws IOException{
+        Command command=(Command)o;
         command.setCommandId(sequenceGenerator.getNextSequenceId());
         command.setResponseRequired(false);
         next.oneway(command);
     }
 
-    public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws
IOException {
-    	Command command = (Command) o;
+    public FutureResponse asyncRequest(Object o,ResponseCallback responseCallback) throws
IOException{
+        Command command=(Command)o;
         command.setCommandId(sequenceGenerator.getNextSequenceId());
         command.setResponseRequired(true);
-        FutureResponse future = new FutureResponse(responseCallback);
-        requestMap.put(new Integer(command.getCommandId()), future);
+        FutureResponse future=new FutureResponse(responseCallback);
+        synchronized(requestMap){
+            requestMap.put(new Integer(command.getCommandId()),future);
+        }
         next.oneway(command);
         return future;
     }
-    
-    public Object request(Object command) throws IOException { 
-        FutureResponse response = asyncRequest(command, null);
+
+    public Object request(Object command) throws IOException{
+        FutureResponse response=asyncRequest(command,null);
         return response.getResult();
     }
-    
-    public Object request(Object command,int timeout) throws IOException {
-        FutureResponse response = asyncRequest(command, null);
+
+    public Object request(Object command,int timeout) throws IOException{
+        FutureResponse response=asyncRequest(command,null);
         return response.getResult(timeout);
     }
-    
-    public void onCommand(Object o) {
-    	Command command = (Command) o;
-        
-        if( command.isResponse() ) {
-            Response response = (Response) command;
-            FutureResponse future = (FutureResponse) requestMap.remove(new Integer(response.getCorrelationId()));
-            if( future!=null ) {
+
+    public void onCommand(Object o){
+        Command command=(Command)o;
+        if(command.isResponse()){
+            Response response=(Response)command;
+            FutureResponse future=null;
+            synchronized(requestMap){
+                future=(FutureResponse)requestMap.remove(new Integer(response.getCorrelationId()));
+            }
+            if(future!=null){
                 future.set(response);
-            } else {
-                if( debug ) log.debug("Received unexpected response for command id: "+response.getCorrelationId());
+            }else{
+                if(debug)
+                    log.debug("Received unexpected response for command id: "+response.getCorrelationId());
             }
-        } else {
+        }else{
             getTransportListener().onCommand(command);
         }
     }
-    
+
     /**
-     * If an async exception occurs, then assume no responses will arrive for any of
-     * current requests.  Lets let them know of the problem.
+     * If an async exception occurs, then assume no responses will arrive for any of current
requests. Lets let them
+     * know of the problem.
      */
-    public void onException(IOException error) {
-        
+    public void onException(IOException error){
         // Copy and Clear the request Map
-        ArrayList requests = new ArrayList(requestMap.values());
+        ArrayList requests=new ArrayList(requestMap.values());
         requestMap.clear();
-        
-        for (Iterator iter = requests.iterator(); iter.hasNext();) {
-            FutureResponse fr = (FutureResponse) iter.next();
+        for(Iterator iter=requests.iterator();iter.hasNext();){
+            FutureResponse fr=(FutureResponse)iter.next();
             fr.set(new ExceptionResponse(error));
         }
-        
         super.onException(error);
     }
-    
-    public IntSequenceGenerator getSequenceGenerator() {
+
+    public IntSequenceGenerator getSequenceGenerator(){
         return sequenceGenerator;
     }
 
-    public String toString() {
+    public String toString(){
         return next.toString();
     }
-
 }



Mime
View raw message