cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r918189 - in /incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra: locator/AbstractReplicationStrategy.java service/StorageProxy.java service/StorageService.java
Date Tue, 02 Mar 2010 21:25:39 GMT
Author: jbellis
Date: Tue Mar  2 21:25:39 2010
New Revision: 918189

URL: http://svn.apache.org/viewvc?rev=918189&view=rev
Log:
assign write hints to endpoints already getting a write message, if any.  patch by jbellis;
reviewed by Ryan King for CASSANDRA-822

Modified:
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=918189&r1=918188&r2=918189&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
(original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
Tue Mar  2 21:25:39 2010
@@ -52,6 +52,11 @@
         snitch_ = snitch;
     }
 
+    /**
+     * get the endpoints that should store the given Token, for the given table.
+     * Note that while the endpoints are conceptually a Set (no duplicates will be included),
+     * we return a List to avoid an extra allocation when sorting by proximity later.
+     */
     public abstract ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata
metadata, String table);
     
     public WriteResponseHandler getWriteResponseHandler(int blockFor, ConsistencyLevel consistency_level,
String table)
@@ -65,51 +70,43 @@
     }
     
     /**
-     * returns map of {ultimate target: destination}, where if destination is not the same
-     * as the ultimate target, it is a "hinted" node, a node that will deliver the data to
+     * returns multimap of {live destination: ultimate targets}, where if target is not the
same
+     * as the destination, it is a "hinted" write, and will need to be sent to
      * the ultimate target when it becomes alive again.
-     *
-     * A destination node may be the destination for multiple targets.
      */
-    public Map<InetAddress, InetAddress> getHintedEndpoints(Token token, String table,
Collection<InetAddress> naturalEndpoints)
+    public Multimap<InetAddress, InetAddress> getHintedEndpoints(String table, Collection<InetAddress>
targets)
     {
-        Collection<InetAddress> targets = getWriteEndpoints(token, table, naturalEndpoints);
-        Set<InetAddress> usedEndpoints = new HashSet<InetAddress>();
-        Map<InetAddress, InetAddress> map = new HashMap<InetAddress, InetAddress>();
+        Multimap<InetAddress, InetAddress> map = HashMultimap.create(targets.size(),
1);
 
         IEndPointSnitch endPointSnitch = DatabaseDescriptor.getEndPointSnitch(table);
-        Set<InetAddress> liveNodes = Gossiper.instance.getLiveMembers();
 
+        // first, add the live endpoints
         for (InetAddress ep : targets)
         {
             if (FailureDetector.instance.isAlive(ep))
-            {
                 map.put(ep, ep);
-                usedEndpoints.add(ep);
-            }
-            else
-            {
-                // find another endpoint to store a hint on.  prefer endpoints that aren't
already in use
-                InetAddress hintLocation = null;
-                List<InetAddress> preferred = endPointSnitch.getSortedListByProximity(ep,
liveNodes);
-
-                for (InetAddress hintCandidate : preferred)
-                {
-                    if (!targets.contains(hintCandidate)
-                        && !usedEndpoints.contains(hintCandidate)
-                        && tokenMetadata_.isMember(hintCandidate))
-                    {
-                        hintLocation = hintCandidate;
-                        break;
-                    }
-                }
-                // if all endpoints are already in use, might as well store it locally to
save the network trip
-                if (hintLocation == null)
-                    hintLocation = FBUtilities.getLocalAddress();
+        }
 
-                map.put(ep, hintLocation);
-                usedEndpoints.add(hintLocation);
-            }
+        if (map.size() == targets.size())
+            return map; // everything was alive
+
+        // assign dead endpoints to be hinted to the closest live one, or to the local node
+        // (since it is trivially the closest) if none are alive.  This way, the cost of
doing
+        // a hint is only adding the hint header, rather than doing a full extra write, if
any
+        // destination nodes are alive.
+        //
+        // we do a 2nd pass on targets instead of using temporary storage,
+        // to optimize for the common case (everything was alive).
+        InetAddress localAddress = FBUtilities.getLocalAddress();
+        for (InetAddress ep : targets)
+        {
+            if (map.containsKey(ep))
+                continue;
+
+            InetAddress destination = map.isEmpty()
+                                    ? localAddress
+                                    : endPointSnitch.getSortedListByProximity(localAddress,
map.keySet()).get(0);
+            map.put(destination, ep);
         }
 
         return map;

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=918189&r1=918188&r2=918189&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
Tue Mar  2 21:25:39 2010
@@ -20,38 +20,40 @@
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
 import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
-import java.lang.management.ManagementFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 
+import org.apache.log4j.Logger;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 
 import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Multimap;
+import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
-import java.net.InetAddress;
-
-import org.apache.cassandra.dht.*;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.LatencyTracker;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LatencyTracker;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
-import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.concurrent.StageManager;
-
-import org.apache.log4j.Logger;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 
 
 public class StorageProxy implements StorageProxyMBean
@@ -101,25 +103,30 @@
         long startTime = System.nanoTime();
         try
         {
+            StorageService ss = StorageService.instance;
             for (final RowMutation rm: mutations)
             {
                 try
                 {
-                    List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(rm.getTable(),
rm.key());
-                    Map<InetAddress, InetAddress> endpointMap = StorageService.instance.getHintedEndpointMap(rm.getTable(),
rm.key(), naturalEndpoints);
+                    String table = rm.getTable();
+                    AbstractReplicationStrategy rs = ss.getReplicationStrategy(table);
+
+                    List<InetAddress> naturalEndpoints = ss.getNaturalEndpoints(table,
rm.key());
+                    Multimap<InetAddress,InetAddress> hintedEndpoints = rs.getHintedEndpoints(table,
naturalEndpoints);
                     Message unhintedMessage = null; // lazy initialize for non-local, unhinted
writes
 
                     // 3 cases:
                     // 1. local, unhinted write: run directly on write stage
                     // 2. non-local, unhinted write: send row mutation message
                     // 3. hinted write: add hint header, and send message
-                    for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet())
+                    for (Map.Entry<InetAddress, Collection<InetAddress>> entry
: hintedEndpoints.asMap().entrySet())
                     {
-                        InetAddress target = entry.getKey();
-                        InetAddress hintedTarget = entry.getValue();
-                        if (target.equals(hintedTarget))
+                        InetAddress destination = entry.getKey();
+                        Collection<InetAddress> targets = entry.getValue();
+                        if (targets.size() == 1 && targets.iterator().next().equals(destination))
                         {
-                            if (target.equals(FBUtilities.getLocalAddress()))
+                            // unhinted writes
+                            if (destination.equals(FBUtilities.getLocalAddress()))
                             {
                                 if (logger.isDebugEnabled())
                                     logger.debug("insert writing local key " + rm.key());
@@ -137,17 +144,24 @@
                                 if (unhintedMessage == null)
                                     unhintedMessage = rm.makeRowMutationMessage();
                                 if (logger.isDebugEnabled())
-                                    logger.debug("insert writing key " + rm.key() + " to
" + unhintedMessage.getMessageId() + "@" + target);
-                                MessagingService.instance.sendOneWay(unhintedMessage, target);
+                                    logger.debug("insert writing key " + rm.key() + " to
" + unhintedMessage.getMessageId() + "@" + destination);
+                                MessagingService.instance.sendOneWay(unhintedMessage, destination);
                             }
                         }
                         else
                         {
+                            // hinted
                             Message hintedMessage = rm.makeRowMutationMessage();
-                            addHintHeader(hintedMessage, target);
-                            if (logger.isDebugEnabled())
-                                logger.debug("insert writing key " + rm.key() + " to " +
hintedMessage.getMessageId() + "@" + hintedTarget + " for " + target);
-                            MessagingService.instance.sendOneWay(hintedMessage, hintedTarget);
+                            for (InetAddress target : targets)
+                            {
+                                if (!target.equals(destination))
+                                {
+                                    addHintHeader(hintedMessage, target);
+                                    if (logger.isDebugEnabled())
+                                        logger.debug("insert writing key " + rm.key() + "
to " + hintedMessage.getMessageId() + "@" + destination + " for " + target);
+                                }
+                            }
+                            MessagingService.instance.sendOneWay(hintedMessage, destination);
                         }
                     }
                 }
@@ -176,31 +190,36 @@
         ArrayList<WriteResponseHandler> responseHandlers = new ArrayList<WriteResponseHandler>();
 
         RowMutation mostRecentRowMutation = null;
+        StorageService ss = StorageService.instance;
         try
         {
-            for (RowMutation rm: mutations)
+            for (RowMutation rm : mutations)
             {
                 mostRecentRowMutation = rm;
-                List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(rm.getTable(),
rm.key());
-                Map<InetAddress, InetAddress> endpointMap = StorageService.instance.getHintedEndpointMap(rm.getTable(),
rm.key(), naturalEndpoints);
-                int blockFor = determineBlockFor(endpointMap.size(), consistency_level);
-                
+                String table = rm.getTable();
+                AbstractReplicationStrategy rs = ss.getReplicationStrategy(table);
+
+                List<InetAddress> naturalEndpoints = ss.getNaturalEndpoints(table,
rm.key());
+                Collection<InetAddress> writeEndpoints = rs.getWriteEndpoints(StorageService.getPartitioner().getToken(rm.key()),
table, naturalEndpoints);
+                Multimap<InetAddress, InetAddress> hintedEndpoints = rs.getHintedEndpoints(table,
writeEndpoints);
+                int blockFor = determineBlockFor(writeEndpoints.size(), consistency_level);
+
                 // avoid starting a write we know can't achieve the required consistency
-                assureSufficientLiveNodes(endpointMap, blockFor, consistency_level);
+                assureSufficientLiveNodes(blockFor, writeEndpoints, hintedEndpoints, consistency_level);
                 
-                // send out the writes, as in insert() above, but this time with a callback
that tracks responses
-                final WriteResponseHandler responseHandler = StorageService.instance.getWriteResponseHandler(blockFor,
consistency_level, rm.getTable());
+                // send out the writes, as in mutate() above, but this time with a callback
that tracks responses
+                final WriteResponseHandler responseHandler = ss.getWriteResponseHandler(blockFor,
consistency_level, table);
                 responseHandlers.add(responseHandler);
                 Message unhintedMessage = null;
-                for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet())
+                for (Map.Entry<InetAddress, Collection<InetAddress>> entry :
hintedEndpoints.asMap().entrySet())
                 {
-                    InetAddress naturalTarget = entry.getKey();
-                    InetAddress maybeHintedTarget = entry.getValue();
-    
-                    if (naturalTarget.equals(maybeHintedTarget))
+                    InetAddress destination = entry.getKey();
+                    Collection<InetAddress> targets = entry.getValue();
+
+                    if (targets.size() == 1 && targets.iterator().next().equals(destination))
                     {
-                        // not hinted
-                        if (naturalTarget.equals(FBUtilities.getLocalAddress()))
+                        // unhinted writes
+                        if (destination.equals(FBUtilities.getLocalAddress()))
                         {
                             insertLocalMessage(rm, responseHandler);
                         }
@@ -213,20 +232,27 @@
                                 MessagingService.instance.addCallback(responseHandler, unhintedMessage.getMessageId());
                             }
                             if (logger.isDebugEnabled())
-                                logger.debug("insert writing key " + rm.key() + " to " +
unhintedMessage.getMessageId() + "@" + naturalTarget);
-                            MessagingService.instance.sendOneWay(unhintedMessage, naturalTarget);
+                                logger.debug("insert writing key " + rm.key() + " to " +
unhintedMessage.getMessageId() + "@" + destination);
+                            MessagingService.instance.sendOneWay(unhintedMessage, destination);
                         }
                     }
                     else
                     {
+                        // hinted
                         Message hintedMessage = rm.makeRowMutationMessage();
-                        addHintHeader(hintedMessage, naturalTarget);
-                        // (hints are part of the callback and count towards consistency
only under CL.ANY
-                        if (consistency_level == ConsistencyLevel.ANY)
+                        for (InetAddress target : targets)
+                        {
+                            if (!target.equals(destination))
+                            {
+                                addHintHeader(hintedMessage, target);
+                                if (logger.isDebugEnabled())
+                                    logger.debug("insert writing key " + rm.key() + " to
" + hintedMessage.getMessageId() + "@" + destination + " for " + target);
+                            }
+                        }
+                        // (non-destination hints are part of the callback and count towards
consistency only under CL.ANY)
+                        if (writeEndpoints.contains(destination) || consistency_level ==
ConsistencyLevel.ANY)
                             MessagingService.instance.addCallback(responseHandler, hintedMessage.getMessageId());
-                        if (logger.isDebugEnabled())
-                            logger.debug("insert writing key " + rm.key() + " to " + hintedMessage.getMessageId()
+ "@" + maybeHintedTarget + " for " + naturalTarget);
-                        MessagingService.instance.sendOneWay(hintedMessage, maybeHintedTarget);
+                        MessagingService.instance.sendOneWay(hintedMessage, destination);
                     }
                 }
             }
@@ -250,30 +276,26 @@
 
     }
 
-    private static void assureSufficientLiveNodes(Map<InetAddress, InetAddress> endpointMap,
int blockFor, ConsistencyLevel consistencyLevel)
+    private static void assureSufficientLiveNodes(int blockFor, Collection<InetAddress>
writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel
consistencyLevel)
             throws UnavailableException
     {
         if (consistencyLevel == ConsistencyLevel.ANY)
         {
             // ensure there are blockFor distinct living nodes (hints are ok).
-            if (new HashSet(endpointMap.values()).size() < blockFor)
+            if (hintedEndpoints.keySet().size() < blockFor)
                 throw new UnavailableException();
         }
-        else
+
+        // count destinations that are part of the desired target set
+        int liveNodes = 0;
+        for (InetAddress destination : hintedEndpoints.keySet())
         {
-            // only count live + unhinted nodes.
-            int liveNodes = 0;
-            for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet())
-            {
-                if (entry.getKey().equals(entry.getValue()))
-                {
-                    liveNodes++;
-                }
-            }
-            if (liveNodes < blockFor)
-            {
-                throw new UnavailableException();
-            }
+            if (writeEndpoints.contains(destination))
+                liveNodes++;
+        }
+        if (liveNodes < blockFor)
+        {
+            throw new UnavailableException();
         }
     }
 

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java?rev=918189&r1=918188&r2=918189&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
Tue Mar  2 21:25:39 2010
@@ -1177,18 +1177,6 @@
     }
 
     /**
-     * This method returns the N endpoints that are responsible for storing the
-     * specified key i.e for replication.
-     *
-     * @param key - key for which we need to find the endpoint return value -
-     * the endpoint responsible for this key
-     */
-    public Map<InetAddress, InetAddress> getHintedEndpointMap(String table, String
key, List<InetAddress> naturalEndpoints)
-    {
-        return getReplicationStrategy(table).getHintedEndpoints(partitioner_.getToken(key),
table, naturalEndpoints);
-    }
-
-    /**
      * This function finds the closest live endpoint that contains a given key.
      */
     public InetAddress findSuitableEndPoint(String table, String key) throws IOException,
UnavailableException



Mime
View raw message