cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r881281 - in /incubator/cassandra/trunk: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/
Date Tue, 17 Nov 2009 13:34:07 GMT
Author: jbellis
Date: Tue Nov 17 13:34:06 2009
New Revision: 881281

URL: http://svn.apache.org/viewvc?rev=881281&view=rev
Log:
add WriteResponseHandler combining the important parts of QuorumResponseHandler and WriteResponseResolver.
In particular, not thate we (correctly) never send a write response of false, letting the
timeout take care
of that should-never-happen case.  optimize local writes in insertBlocking, and fix HH.

patch by jbellis; reviewed by Jaakko Laine for CASSANDRA-558

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseResolver.java
Modified:
    incubator/cassandra/trunk/CHANGES.txt
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Modified: incubator/cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=881281&r1=881280&r2=881281&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Tue Nov 17 13:34:06 2009
@@ -41,7 +41,8 @@
    interfaces (CASSANDRA-546)
  * stress.py benchmarking tool improvements (several tickets)
  * optimized replica placement code (CASSANDRA-525)
- * faster log replay on restart (CASSANDRA-539, -540)
+ * faster log replay on restart (CASSANDRA-539, CASSANDRA-540)
+ * optimized local-node writes (CASSANDRA-558)
  
 
 0.4.2

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=881281&r1=881280&r2=881281&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Tue
Nov 17 13:34:06 2009
@@ -101,7 +101,7 @@
         return instance_;
     }
 
-    private static boolean sendMessage(InetAddress endPoint, String tableName, String key)
throws DigestMismatchException, TimeoutException, IOException, InvalidRequestException
+    private static boolean sendMessage(InetAddress endPoint, String tableName, String key)
throws IOException
     {
         if (!FailureDetector.instance().isAlive(endPoint))
         {
@@ -112,10 +112,18 @@
         Row row = table.get(key);
         RowMutation rm = new RowMutation(tableName, row);
         Message message = rm.makeRowMutationMessage();
-        QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(1,
new WriteResponseResolver());
-        MessagingService.instance().sendRR(message, new InetAddress[] { endPoint }, quorumResponseHandler);
+        WriteResponseHandler responseHandler = new WriteResponseHandler(1);
+        MessagingService.instance().sendRR(message, new InetAddress[] { endPoint }, responseHandler);
 
-        return quorumResponseHandler.get();
+        try
+        {
+            responseHandler.get();
+        }
+        catch (TimeoutException e)
+        {
+            return false;
+        }
+        return true;
     }
 
     private static void deleteEndPoint(byte[] endpointAddress, String tableName, byte[] key,
long timestamp) throws IOException
@@ -205,7 +213,7 @@
                 Collection<IColumn> endpoints = keyColumn.getSubColumns();
                 for (IColumn hintEndPoint : endpoints)
                 {
-                    if (Arrays.equals(hintEndPoint.name(), targetEPBytes) && sendMessage(endPoint,
null, keyStr))
+                    if (Arrays.equals(hintEndPoint.name(), targetEPBytes) && sendMessage(endPoint,
tableName, keyStr))
                     {
                         if (endpoints.size() == 1)
                         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=881281&r1=881280&r2=881281&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
Tue Nov 17 13:34:06 2009
@@ -31,6 +31,7 @@
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.service.IResponseResolver;
 import org.apache.cassandra.service.QuorumResponseHandler;
+import org.apache.cassandra.service.WriteResponseHandler;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -55,9 +56,9 @@
 
     public abstract ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata
metadata);
     
-    public <T> QuorumResponseHandler<T> getResponseHandler(IResponseResolver<T>
responseResolver, int blockFor, int consistency_level)
+    public WriteResponseHandler getWriteResponseHandler(int blockFor, int consistency_level)
     {
-        return new QuorumResponseHandler<T>(blockFor, responseResolver);
+        return new WriteResponseHandler(blockFor);
     }
 
     public ArrayList<InetAddress> getNaturalEndpoints(Token token)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java?rev=881281&r1=881280&r2=881281&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
Tue Nov 17 13:34:06 2009
@@ -202,17 +202,16 @@
      * return a DCQRH with a map of all the DC rep facor.
      */
     @Override
-    public <T> QuorumResponseHandler<T> getResponseHandler(IResponseResolver<T>
responseResolver, int blockFor, int consistency_level)
+    public WriteResponseHandler getWriteResponseHandler(int blockFor, int consistency_level)
     {
         if (consistency_level == ConsistencyLevel.DCQUORUM)
         {
-            List<InetAddress> endpoints = getLocalEndPoints();
-            return new DatacenterQuorumResponseHandler<T>(locQFactor, responseResolver);
+            return new DatacenterQuorumResponseHandler(locQFactor);
         }
         else if (consistency_level == ConsistencyLevel.DCQUORUMSYNC)
         {
-            return new DatacenterQuorumSyncResponseHandler<T>(getQuorumRepFactor(),
responseResolver);
+            return new DatacenterQuorumSyncResponseHandler(getQuorumRepFactor());
         }
-        return super.getResponseHandler(responseResolver, blockFor, consistency_level);
+        return super.getWriteResponseHandler(blockFor, consistency_level);
     }
 }
\ No newline at end of file

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=881281&r1=881280&r2=881281&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Tue
Nov 17 13:34:06 2009
@@ -322,15 +322,20 @@
      */
     public String sendRR(Message message, InetAddress[] to, IAsyncCallback cb)
     {
-        String messageId = message.getMessageId();                        
-        callbackMap_.put(messageId, cb);
+        String messageId = message.getMessageId();
+        addCallback(cb, messageId);
         for ( int i = 0; i < to.length; ++i )
         {
             sendOneWay(message, to[i]);
         }
         return messageId;
     }
-    
+
+    public void addCallback(IAsyncCallback cb, String messageId)
+    {
+        callbackMap_.put(messageId, cb);
+    }
+
     /**
      * Send a message to a given endpoint. This method specifies a callback
      * which is invoked with the actual response.
@@ -344,7 +349,7 @@
     public String sendRR(Message message, InetAddress to, IAsyncCallback cb)
     {        
         String messageId = message.getMessageId();
-        callbackMap_.put(messageId, cb);
+        addCallback(cb, messageId);
         sendOneWay(message, to);
         return messageId;
     }
@@ -369,7 +374,7 @@
             throw new IllegalArgumentException("Number of messages and the number of endpoints
need to be same.");
         }
         String groupId = GuidGenerator.guid();
-        callbackMap_.put(groupId, cb);
+        addCallback(cb, groupId);
         for ( int i = 0; i < messages.length; ++i )
         {
             messages[i].setMessageId(groupId);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java?rev=881281&r1=881280&r2=881281&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
Tue Nov 17 13:34:06 2009
@@ -16,16 +16,16 @@
  * provided in the input map. it will block till we recive response from (DC, n)
  * nodes.
  */
-public class DatacenterQuorumResponseHandler<T> extends QuorumResponseHandler<T>
+public class DatacenterQuorumResponseHandler extends WriteResponseHandler
 {
     private int blockFor;
     private IEndPointSnitch endpointsnitch;
     private InetAddress localEndpoint;
 
-    public DatacenterQuorumResponseHandler(int blockFor, IResponseResolver<T> responseResolver)
+    public DatacenterQuorumResponseHandler(int blockFor)
     {
         // Response is been managed by the map so the waitlist size really doesnt matter.
-        super(blockFor, responseResolver);
+        super(blockFor);
         this.blockFor = blockFor;
         endpointsnitch = DatabaseDescriptor.getEndPointSnitch();
         localEndpoint = FBUtilities.getLocalAddress();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java?rev=881281&r1=881280&r2=881281&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java
Tue Nov 17 13:34:06 2009
@@ -15,15 +15,15 @@
  * provided in the input map. it will block till we recive response from
  * n nodes in each of our data centers.
  */
-public class DatacenterQuorumSyncResponseHandler<T> extends QuorumResponseHandler<T>
+public class DatacenterQuorumSyncResponseHandler extends WriteResponseHandler
 {
     private final Map<String, Integer> dcResponses = new HashMap<String, Integer>();
     private final Map<String, Integer> responseCounts;
 
-    public DatacenterQuorumSyncResponseHandler(Map<String, Integer> responseCounts,
IResponseResolver<T> responseResolver)
+    public DatacenterQuorumSyncResponseHandler(Map<String, Integer> responseCounts)
     {
         // Response is been managed by the map so make it 1 for the superclass.
-        super(1, responseResolver);
+        super(1);
         this.responseCounts = responseCounts;
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=881281&r1=881280&r2=881281&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Tue Nov 17 13:34:06 2009
@@ -47,7 +47,7 @@
  */
 public class ReadResponseResolver implements IResponseResolver<Row>
 {
-	private static Logger logger_ = Logger.getLogger(WriteResponseResolver.class);
+	private static Logger logger_ = Logger.getLogger(ReadResponseResolver.class);
 
 	/*
 	 * This method for resolving read data should look at the timestamps of each

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=881281&r1=881280&r2=881281&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Tue
Nov 17 13:34:06 2009
@@ -19,7 +19,6 @@
 
 import java.io.IOError;
 import java.io.IOException;
-import java.io.IOError;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -149,53 +148,85 @@
         }
     }
     
-    public static void insertBlocking(RowMutation rm, int consistency_level) throws UnavailableException
+    public static void insertBlocking(final RowMutation rm, int consistency_level) throws
UnavailableException
     {
         long startTime = System.currentTimeMillis();
-        Message message;
-        try
-        {
-            message = rm.makeRowMutationMessage();
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
         try
         {
             List<InetAddress> naturalEndpoints = StorageService.instance().getNaturalEndpoints(rm.key());
             Map<InetAddress, InetAddress> endpointMap = StorageService.instance().getHintedEndpointMap(rm.key(),
naturalEndpoints);
             int blockFor = determineBlockFor(naturalEndpoints.size(), endpointMap.size(),
consistency_level);
-            List<InetAddress> primaryNodes = getUnhintedNodes(endpointMap);
-            if (primaryNodes.size() < blockFor) // guarantee blockFor = W live nodes.
-            {
-                throw new UnavailableException();
-            }
-            QuorumResponseHandler<Boolean> quorumResponseHandler = StorageService.instance().getResponseHandler(new
WriteResponseResolver(), blockFor, consistency_level);
-            if (logger.isDebugEnabled())
-                logger.debug("insertBlocking writing key " + rm.key() + " to " + message.getMessageId()
+ "@[" + StringUtils.join(endpointMap.values(), ", ") + "]");
 
-            // Get all the targets and stick them in an array
-            MessagingService.instance().sendRR(message, primaryNodes.toArray(new InetAddress[primaryNodes.size()]),
quorumResponseHandler);
-            try
+            // avoid starting a write we know can't achieve the required consistency
+            int liveNodes = 0;
+            for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet())
             {
-                if (!quorumResponseHandler.get())
-                    throw new UnavailableException();
+                if (entry.getKey().equals(entry.getValue()))
+                {
+                    liveNodes++;
+                }
             }
-            catch (DigestMismatchException e)
+            if (liveNodes < blockFor)
             {
-                throw new AssertionError(e);
+                throw new UnavailableException();
             }
-            if (primaryNodes.size() < endpointMap.size()) // Do we need to bother with
Hinted Handoff?
+
+            // send out the writes, as in insert() above, but this time with a callback that
tracks responses
+            final WriteResponseHandler responseHandler = StorageService.instance().getWriteResponseHandler(blockFor,
consistency_level);
+            Message unhintedMessage = null;
+            for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet())
             {
-                for (Map.Entry<InetAddress, InetAddress> e : endpointMap.entrySet())
+                InetAddress target = entry.getKey();
+                InetAddress hintedTarget = entry.getValue();
+
+                if (target.equals(hintedTarget))
                 {
-                    if (!e.getKey().equals(e.getValue())) // Hinted Handoff to target
+                    if (target.equals(FBUtilities.getLocalAddress()))
+                    {
+                        if (logger.isDebugEnabled())
+                            logger.debug("insert writing local key " + rm.key());
+                        Runnable runnable = new Runnable()
+                        {
+                            public void run()
+                            {
+                                try
+                                {
+                                    rm.apply();
+                                    responseHandler.localResponse();
+                                }
+                                catch (IOException e)
+                                {
+                                    throw new IOError(e);
+                                }
+                            }
+                        };
+                        StageManager.getStage(StageManager.mutationStage_).execute(runnable);
+                    }
+                    else
                     {
-                        MessagingService.instance().sendOneWay(message, e.getValue());
+                        if (unhintedMessage == null)
+                        {
+                            unhintedMessage = rm.makeRowMutationMessage();
+                            MessagingService.instance().addCallback(responseHandler, unhintedMessage.getMessageId());
+                        }
+                        if (logger.isDebugEnabled())
+                            logger.debug("insert writing key " + rm.key() + " to " + unhintedMessage.getMessageId()
+ "@" + target);
+                        MessagingService.instance().sendOneWay(unhintedMessage, target);
                     }
                 }
+                else
+                {
+                    // (hints aren't part of the callback since they don't count towards
consistency until they are on the final destination node)
+                    Message hintedMessage = rm.makeRowMutationMessage();
+                    hintedMessage.addHeader(RowMutation.HINT, target.getAddress());
+                    if (logger.isDebugEnabled())
+                        logger.debug("insert writing key " + rm.key() + " to " + hintedMessage.getMessageId()
+ "@" + hintedTarget + " for " + target);
+                    MessagingService.instance().sendOneWay(hintedMessage, hintedTarget);
+                }
             }
+
+            // wait for writes.  throws timeoutexception if necessary
+            responseHandler.get();
         }
         catch (TimeoutException e)
         {
@@ -211,19 +242,6 @@
         }
     }
 
-    private static List<InetAddress> getUnhintedNodes(Map<InetAddress, InetAddress>
endpointMap)
-    {
-        List<InetAddress> liveEndPoints = new ArrayList<InetAddress>(endpointMap.size());
-        for (Map.Entry<InetAddress, InetAddress> e : endpointMap.entrySet())
-        {
-            if (e.getKey().equals(e.getValue()))
-            {
-                liveEndPoints.add(e.getKey());
-            }
-        }
-        return liveEndPoints;
-    }
-
     private static int determineBlockFor(int naturalTargets, int hintedTargets, int consistency_level)
     {
         assert naturalTargets >= 1;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=881281&r1=881280&r2=881281&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue
Nov 17 13:34:06 2009
@@ -1037,9 +1037,9 @@
         unbootstrap(finishMoving);
     }
 
-    public <T> QuorumResponseHandler<T> getResponseHandler(IResponseResolver<T>
responseResolver, int blockFor, int consistency_level)
+    public WriteResponseHandler getWriteResponseHandler(int blockFor, int consistency_level)
     {
-        return replicationStrategy_.getResponseHandler(responseResolver, blockFor, consistency_level);
+        return replicationStrategy_.getWriteResponseHandler(blockFor, consistency_level);
     }
 
     public AbstractReplicationStrategy getReplicationStrategy()

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=881281&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
(added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
Tue Nov 17 13:34:06 2009
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.io.IOException;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.SimpleCondition;
+
+import org.apache.log4j.Logger;
+
+public class WriteResponseHandler implements IAsyncCallback
+{
+    protected static final Logger logger = Logger.getLogger( WriteResponseHandler.class );
+    protected final SimpleCondition condition = new SimpleCondition();
+    private final int responseCount;
+    protected final List<Message> responses;
+    protected int localResponses;
+    private final long startTime;
+
+    public WriteResponseHandler(int responseCount)
+    {
+        assert 1 <= responseCount && responseCount <= DatabaseDescriptor.getReplicationFactor()
+            : "invalid response count " + responseCount;
+
+        this.responseCount = responseCount;
+        responses = new ArrayList<Message>(responseCount);
+        startTime = System.currentTimeMillis();
+    }
+
+    public void get() throws TimeoutException
+    {
+        try
+        {
+            long timeout = System.currentTimeMillis() - startTime + DatabaseDescriptor.getRpcTimeout();
+            boolean success;
+            try
+            {
+                success = condition.await(timeout, TimeUnit.MILLISECONDS);
+            }
+            catch (InterruptedException ex)
+            {
+                throw new AssertionError(ex);
+            }
+
+            if (!success)
+            {
+                throw new TimeoutException("Operation timed out - received only " + responses.size()
+ localResponses + " responses");
+            }
+        }
+        finally
+        {
+            for (Message response : responses)
+            {
+                MessagingService.removeRegisteredCallback(response.getMessageId());
+            }
+        }
+    }
+
+    public synchronized void response(Message message)
+    {
+        if (condition.isSignaled())
+            return;
+        responses.add(message);
+        maybeSignal();
+    }
+
+    public synchronized void localResponse()
+    {
+        if (condition.isSignaled())
+            return;
+        localResponses++;
+        maybeSignal();
+    }
+
+    private void maybeSignal()
+    {
+        if (responses.size() + localResponses >= responseCount)
+        {
+            condition.signal();
+        }
+    }
+}
\ No newline at end of file



Mime
View raw message