cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r829819 - in /incubator/cassandra/trunk: interface/ interface/gen-java/org/apache/cassandra/service/ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/service/
Date Mon, 26 Oct 2009 15:00:20 GMT
Author: jbellis
Date: Mon Oct 26 15:00:20 2009
New Revision: 829819

URL: http://svn.apache.org/viewvc?rev=829819&view=rev
Log:
add dcquorum/dcquorumsync consistency levels, representing "a quorum of the replicas in the
current dc" and "a quorum of the replicas in each dc," respectively.
patch by Vijay Parthasarathy; reviewed by jbellis for CASSANDRA-492

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java
Modified:
    incubator/cassandra/trunk/interface/cassandra.thrift
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Modified: incubator/cassandra/trunk/interface/cassandra.thrift
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/cassandra.thrift?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
--- incubator/cassandra/trunk/interface/cassandra.thrift (original)
+++ incubator/cassandra/trunk/interface/cassandra.thrift Mon Oct 26 15:00:20 2009
@@ -92,7 +92,9 @@
     ZERO = 0,
     ONE = 1,
     QUORUM = 2,
-    ALL = 3,
+    DCQUORUM = 3,
+    DCQUORUMSYNC = 4,
+    ALL = 5,
 }
 
 struct ColumnParent {

Modified: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
--- incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java
(original)
+++ incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java
Mon Oct 26 15:00:20 2009
@@ -38,18 +38,24 @@
   public static final int ZERO = 0;
   public static final int ONE = 1;
   public static final int QUORUM = 2;
-  public static final int ALL = 3;
+  public static final int DCQUORUM = 3;
+  public static final int DCQUORUMSYNC = 4;
+  public static final int ALL = 5;
 
   public static final IntRangeSet VALID_VALUES = new IntRangeSet(
     ZERO, 
     ONE, 
     QUORUM, 
+    DCQUORUM, 
+    DCQUORUMSYNC, 
     ALL );
 
   public static final Map<Integer, String> VALUES_TO_NAMES = new HashMap<Integer,
String>() {{
     put(ZERO, "ZERO");
     put(ONE, "ONE");
     put(QUORUM, "QUORUM");
+    put(DCQUORUM, "DCQUORUM");
+    put(DCQUORUMSYNC, "DCQUORUMSYNC");
     put(ALL, "ALL");
   }};
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
Mon Oct 26 15:00:20 2009
@@ -18,17 +18,20 @@
 */
 package org.apache.cassandra.locator;
 
+import java.net.InetAddress;
 import java.util.*;
 
 import org.apache.log4j.Logger;
 
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
-import java.net.InetAddress;
-
+import org.apache.cassandra.service.IResponseResolver;
+import org.apache.cassandra.service.InvalidRequestException;
+import org.apache.cassandra.service.QuorumResponseHandler;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.config.DatabaseDescriptor;
 
 /**
  * This class contains a helper method that will be used by
@@ -53,6 +56,11 @@
     }
 
     public abstract ArrayList<InetAddress> getNaturalEndpoints(Token token, Map<Token,
InetAddress> tokenToEndPointMap);
+    
+    public <T> QuorumResponseHandler<T> getResponseHandler(IResponseResolver<T>
responseResolver, int blockFor, int consistency_level) throws InvalidRequestException
+    {
+        return new QuorumResponseHandler<T>(blockFor, responseResolver);
+    }
 
     public ArrayList<InetAddress> getNaturalEndpoints(Token token)
     {

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java?rev=829819&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java
(added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java
Mon Oct 26 15:00:20 2009
@@ -0,0 +1,189 @@
+package org.apache.cassandra.locator;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.utils.XMLUtils;
+import org.xml.sax.SAXException;
+
+/**
+ * DataCenterEndPointSnitch
+ * <p/>
+ * This class basically reads the configuration and sets the IP Ranges to a
+ * hashMap which can be read later. this class also provides a way to compare 2
+ * EndPoints and also get details from the same.
+ */
+
+public class DatacenterEndPointSnitch implements IEndPointSnitch
+{
+    /**
+     * This Map will contain the information of the EndPoints and its Location
+     * (Datacenter and RAC)
+     */
+    private Map<Byte, Map<Byte, String>> ipDC = new HashMap<Byte, Map<Byte,
String>>();
+    private Map<Byte, Map<Byte, String>> ipRAC = new HashMap<Byte, Map<Byte,
String>>();
+    private Map<String, Integer> dcRepFactor = new HashMap<String, Integer>();
+
+    private XMLUtils xmlUtils;
+    private Map<String, Integer> quorumDCMap = new HashMap<String, Integer>();
+    /**
+     * The default rack property file to be read.
+     */
+    private static String DEFAULT_RACK_CONFIG_FILE = "/etc/cassandra/DC-Config.xml";
+
+    /**
+     * Reference to the logger.
+     */
+    private static Logger logger_ = Logger
+            .getLogger(DatacenterEndPointSnitch.class);
+
+    /**
+     * Constructor, intialize XML config and read the config in...
+     */
+    public DatacenterEndPointSnitch() throws IOException,
+                                             ParserConfigurationException, SAXException
+    {
+        super();
+        xmlUtils = new XMLUtils(DEFAULT_RACK_CONFIG_FILE);
+        reloadConfiguration();
+    }
+
+    /**
+     * Return the rack for which an endpoint resides in
+     */
+    public String getRackForEndPoint(InetAddress endPoint)
+            throws UnknownHostException
+    {
+        byte[] ipQuads = getIPAddress(endPoint.getHostAddress());
+        return ipRAC.get(ipQuads[1]).get(ipQuads[2]);
+    }
+
+    /**
+     * This method will load the configuration from the xml file. Mandatory
+     * fields are Atleast 1 DC and 1RAC configurations. Name of the DC/RAC, IP
+     * Quadrents for RAC and DC.
+     * <p/>
+     * This method will not be called everytime
+     */
+    public void reloadConfiguration() throws IOException
+    {
+        try
+        {
+            String[] dcNames = xmlUtils.getNodeValues("/EndPoints/DataCenter/name");
+            for (String dcName : dcNames)
+            {
+                // Parse the Datacenter Quaderant.
+                String dcXPath = "/EndPoints/DataCenter[name='" + dcName + "']";
+                String dcIPQuad = xmlUtils.getNodeValue(dcXPath + "/ip2ndQuad");
+                String replicationFactor = xmlUtils.getNodeValue(dcXPath + "/replicationFactor");
+                byte dcByte = intToByte(Integer.parseInt(dcIPQuad));
+                // Parse the replication factor for a DC
+                int dcReF = Integer.parseInt(replicationFactor);
+                dcRepFactor.put(dcName, dcReF);
+                quorumDCMap.put(dcName, (dcReF / 2 + 1));
+                String[] racNames = xmlUtils.getNodeValues(dcXPath + "/rack/name");
+                Map<Byte, String> dcRackMap = ipDC.get(dcByte);
+                if (null == dcRackMap)
+                {
+                    dcRackMap = new HashMap<Byte, String>();
+                }
+                Map<Byte, String> rackDcMap = ipRAC.get(dcByte);
+                if (null == rackDcMap)
+                {
+                    rackDcMap = new HashMap<Byte, String>();
+                }
+                for (String racName : racNames)
+                {
+                    // Parse the RAC ip Quaderant.
+                    String racIPQuad = xmlUtils.getNodeValue(dcXPath + "/rack[name = '" +
racName + "']/ip3rdQuad");
+                    byte racByte = intToByte(Integer.parseInt(racIPQuad));
+                    dcRackMap.put(racByte, dcName);
+                    rackDcMap.put(racByte, racName);
+                }
+                ipDC.put(dcByte, dcRackMap);
+                ipRAC.put(dcByte, rackDcMap);
+            }
+        }
+        catch (Exception ioe)
+        {
+            throw new IOException("Could not process " + DEFAULT_RACK_CONFIG_FILE, ioe);
+        }
+    }
+
+    /**
+     * This methood will return ture if the hosts are in the same RAC else
+     * false.
+     */
+    public boolean isOnSameRack(InetAddress host, InetAddress host2)
+            throws UnknownHostException
+    {
+        /*
+        * Look at the IP Address of the two hosts. Compare the 2nd and 3rd
+        * octet. If they are the same then the hosts are in the same rack else
+        * different racks.
+        */
+        byte[] ip = getIPAddress(host.getHostAddress());
+        byte[] ip2 = getIPAddress(host2.getHostAddress());
+
+        return ipRAC.get(ip[1]).get(ip[2])
+                .equals(ipRAC.get(ip2[1]).get(ip2[2]));
+    }
+
+    /**
+     * This methood will return ture if the hosts are in the same DC else false.
+     */
+    public boolean isInSameDataCenter(InetAddress host, InetAddress host2)
+            throws UnknownHostException
+    {
+        /*
+        * Look at the IP Address of the two hosts. Compare the 2nd and 3rd
+        * octet and get the DC Name. If they are the same then the hosts are in
+        * the same datacenter else different datacenter.
+        */
+        byte[] ip = getIPAddress(host.getHostAddress());
+        byte[] ip2 = getIPAddress(host2.getHostAddress());
+
+        return ipDC.get(ip[1]).get(ip[2]).equals(ipDC.get(ip2[1]).get(ip2[2]));
+    }
+
+    /**
+     * Returns a DC replication map, the key will be the dc name and the value
+     * will be the replication factor of that Datacenter.
+     */
+    public HashMap<String, Integer> getMapReplicationFactor()
+    {
+        return new HashMap<String, Integer>(dcRepFactor);
+    }
+
+    /**
+     * Returns a DC replication map, the key will be the dc name and the value
+     * will be the replication factor of that Datacenter.
+     */
+    public HashMap<String, Integer> getMapQuorumFactor()
+    {
+        return new HashMap<String, Integer>(quorumDCMap);
+    }
+
+    private byte[] getIPAddress(String host) throws UnknownHostException
+    {
+        InetAddress ia = InetAddress.getByName(host);
+        return ia.getAddress();
+    }
+
+    public static byte intToByte(int n)
+    {
+        return (byte) (n & 0x000000ff);
+    }
+
+    public String getLocation(InetAddress endpoint) throws UnknownHostException
+    {
+        byte[] ipQuads = getIPAddress(endpoint.getHostAddress());
+        return ipDC.get(ipQuads[1]).get(ipQuads[2]);
+    }
+}

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java?rev=829819&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
(added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
Mon Oct 26 15:00:20 2009
@@ -0,0 +1,216 @@
+package org.apache.cassandra.locator;
+
+import java.io.IOException;
+import java.io.IOError;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.Map.Entry;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.*;
+
+/**
+ * This Stategy is little diffrent than the Rack aware Statergy. If there is
+ * replication factor is N. We will make sure that (N-1)%2 of the nodes are in
+ * other Datacenter.... For example if we have 5 nodes this stategy will make
+ * sure to make 2 copies out of 5 in other dataceneter.
+ * <p/>
+ * This class also caches the EndPoints and invalidates the cache if there is a
+ * change in the number of tokens.
+ */
+public class DatacenterShardStategy extends AbstractReplicationStrategy
+{
+    private static Map<String, List<Token>> dcMap = new HashMap<String, List<Token>>();
+    private static Map<String, Integer> dcReplicationFactor = new HashMap<String,
Integer>();
+    private static Map<String, Integer> quorumRepFactor = new HashMap<String, Integer>();
+    private static int locQFactor = 0;
+    private static DatacenterEndPointSnitch endPointSnitch;
+    ArrayList<Token> tokens;
+
+    private List<InetAddress> localEndPoints = new ArrayList<InetAddress>();
+
+    private List<InetAddress> getLocalEndPoints()
+    {
+        return new ArrayList<InetAddress>(localEndPoints);
+    }
+
+    private Map<String, Integer> getQuorumRepFactor()
+    {
+        return new HashMap<String, Integer>(quorumRepFactor);
+    }
+
+    /**
+     * This Method will get the required information of the EndPoint from the
+     * DataCenterEndPointSnitch and poopulates this singleton class.
+     *
+     * @param tokenToEndPointMap - Provided the endpoint map which will be mapped with the
DC's
+     */
+    private void loadEndPoints(Map<Token, InetAddress> tokenToEndPointMap, Collection<Token>
tokens) throws IOException
+    {
+        endPointSnitch = (DatacenterEndPointSnitch) StorageService.instance().getEndPointSnitch();
+        this.tokens = new ArrayList<Token>(tokens);
+        String localDC = endPointSnitch.getLocation(InetAddress.getLocalHost());
+        for (Token token : this.tokens)
+        {
+            InetAddress endPoint = tokenToEndPointMap.get(token);
+            String dataCenter = endPointSnitch.getLocation(endPoint);
+            if (dataCenter.equals(localDC))
+            {
+                localEndPoints.add(endPoint);
+            }
+            List<Token> lst = dcMap.get(dataCenter);
+            if (lst == null)
+            {
+                lst = new ArrayList<Token>();
+            }
+            lst.add(token);
+            dcMap.put(dataCenter, lst);
+        }
+        for (Entry<String, List<Token>> entry : dcMap.entrySet())
+        {
+            List<Token> valueList = entry.getValue();
+            Collections.sort(valueList);
+            dcMap.put(entry.getKey(), valueList);
+        }
+        dcReplicationFactor = endPointSnitch.getMapReplicationFactor();
+        for (Entry<String, Integer> entry : dcReplicationFactor.entrySet())
+        {
+            String datacenter = entry.getKey();
+            int qFactor = (entry.getValue() / 2 + 1);
+            quorumRepFactor.put(datacenter, qFactor);
+            if (datacenter.equals(localDC))
+            {
+                locQFactor = qFactor;
+            }
+        }
+    }
+
+    public DatacenterShardStategy(TokenMetadata tokenMetadata, IPartitioner<Token>
partitioner, int replicas, int storagePort)
+    throws UnknownHostException
+    {
+        super(tokenMetadata, partitioner, replicas, storagePort);
+        assert (DatabaseDescriptor.getEndPointSnitch() instanceof DatacenterEndPointSnitch);
+    }
+
+    @Override
+    public ArrayList<InetAddress> getNaturalEndpoints(Token searchToken, Map<Token,
InetAddress> tokenToEndPointMap)
+    {
+        try
+        {
+            return getNaturalEndpointsInternal(searchToken, tokenToEndPointMap);
+        }
+        catch (IOException e)
+        {
+             throw new IOError(e);
+        }
+    }
+
+    private ArrayList<InetAddress> getNaturalEndpointsInternal(Token searchToken, Map<Token,
InetAddress> tokenToEndPointMap) throws IOException
+    {
+        ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>();
+        if (null == tokens || this.tokens.size() != tokenToEndPointMap.keySet().size())
+        {
+            loadEndPoints(tokenToEndPointMap, tokenToEndPointMap.keySet());
+        }
+
+        for (String dc : dcMap.keySet())
+        {
+            int foundCount = 0;
+            ArrayList<InetAddress> forloopReturn = new ArrayList<InetAddress>();
+            int replicas_ = dcReplicationFactor.get(dc);
+            List tokens = dcMap.get(dc);
+            boolean bOtherRack = false;
+            boolean doneDataCenterItr;
+            int index = Collections.binarySearch(tokens, searchToken);
+            if (index < 0)
+            {
+                index = (index + 1) * (-1);
+                if (index >= tokens.size())
+                {
+                    index = 0;
+                }
+            }
+            int totalNodes = tokens.size();
+            // Add the node at the index by default
+            InetAddress primaryHost = tokenToEndPointMap.get(tokens.get(index));
+            forloopReturn.add(primaryHost);
+            foundCount++;
+            if (replicas_ == 1)
+            {
+                continue;
+            }
+
+            int startIndex = (index + 1) % totalNodes;
+            for (int i = startIndex, count = 1; count < totalNodes && foundCount
< replicas_; ++count, i = (i + 1) % totalNodes)
+            {
+                InetAddress endPointOfIntrest = tokenToEndPointMap.get(tokens.get(i));
+                if ((replicas_ - 1) > foundCount)
+                {
+                    forloopReturn.add(endPointOfIntrest);
+                    continue;
+                }
+                else
+                {
+                    doneDataCenterItr = true;
+                }
+                // Now try to find one on a different rack
+                if (!bOtherRack)
+                {
+                    if (!endPointSnitch.isOnSameRack(primaryHost, endPointOfIntrest))
+                    {
+                        forloopReturn.add(tokenToEndPointMap.get(tokens.get(i)));
+                        bOtherRack = true;
+                        foundCount++;
+                    }
+                }
+                // If both already found exit loop.
+                if (doneDataCenterItr && bOtherRack)
+                {
+                    break;
+                }
+            }
+
+            /*
+            * If we found N number of nodes we are good. This loop wil just
+            * exit. Otherwise just loop through the list and add until we
+            * have N nodes.
+            */
+            for (int i = startIndex, count = 1; count < totalNodes && foundCount
< replicas_; ++count, i = (i + 1) % totalNodes)
+            {
+                if (!forloopReturn.contains(tokenToEndPointMap.get(tokens.get(i))))
+                {
+                    forloopReturn.add(tokenToEndPointMap.get(tokens.get(i)));
+                    foundCount++;
+                }
+            }
+            endpoints.addAll(forloopReturn);
+        }
+
+        return endpoints;
+    }
+
+    /**
+     * This method will generate the QRH object and returns. If the Consistency
+     * level is DCQUORUM then it will return a DCQRH with a map of local rep
+     * factor alone. If the consistency level is DCQUORUMSYNC then it will
+     * return a DCQRH with a map of all the DC rep facor.
+     */
+    @Override
+    public <T> QuorumResponseHandler<T> getResponseHandler(IResponseResolver<T>
responseResolver, int blockFor, int consistency_level)
+    throws InvalidRequestException
+    {
+        if (consistency_level == ConsistencyLevel.DCQUORUM)
+        {
+            List<InetAddress> endpoints = getLocalEndPoints();
+            return new DatacenterQuorumResponseHandler<T>(endpoints, locQFactor, responseResolver);
+        }
+        else if (consistency_level == ConsistencyLevel.DCQUORUMSYNC)
+        {
+            return new DatacenterQuorumSyncResponseHandler<T>(getQuorumRepFactor(),
responseResolver);
+        }
+        return super.getResponseHandler(responseResolver, blockFor, consistency_level);
+    }
+}
\ No newline at end of file

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java Mon
Oct 26 15:00:20 2009
@@ -18,9 +18,8 @@
 
 package org.apache.cassandra.locator;
 
-import java.net.*;
-
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 
 public class EndPointSnitch implements IEndPointSnitch
 {
@@ -30,13 +29,13 @@
          * Look at the IP Address of the two hosts. Compare 
          * the 3rd octet. If they are the same then the hosts
          * are in the same rack else different racks. 
-        */        
+        */
         byte[] ip = host.getAddress();
         byte[] ip2 = host2.getAddress();
-        
-        return ( ip[2] == ip2[2] );
+
+        return ip[2] == ip2[2];
     }
-    
+
     public boolean isInSameDataCenter(InetAddress host, InetAddress host2) throws UnknownHostException
     {
         /*
@@ -46,7 +45,12 @@
         */
         byte[] ip = host.getAddress();
         byte[] ip2 = host2.getAddress();
-        
-        return ( ip[1] == ip2[1] );
+
+        return ip[1] == ip2[1];
+    }
+
+    public String getLocation(InetAddress endpoint) throws UnknownHostException
+    {
+        throw new UnknownHostException("Not Supported");
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java Mon
Oct 26 15:00:20 2009
@@ -27,6 +27,8 @@
  * This interface helps determine location of node in the data center relative to another
node.
  * Give a node A and another node B it can tell if A and B are on the same rack or in the
same
  * data center.
+ *
+ * Not all methods will be germate to all implementations.  Throw UnsupportedOperation as
necessary.
  */
 
 public interface IEndPointSnitch
@@ -48,4 +50,9 @@
      * @throws UnknownHostException
      */
     public boolean isInSameDataCenter(InetAddress host, InetAddress host2) throws UnknownHostException;
+    
+    /**
+     * Given endpoints this method will help us know the datacenter name where the node is
located at.
+     */
+    public String getLocation(InetAddress endpoint) throws UnknownHostException;
 }

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java?rev=829819&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
(added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
Mon Oct 26 15:00:20 2009
@@ -0,0 +1,50 @@
+/**
+ *
+ */
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.List;
+
+import org.apache.cassandra.net.Message;
+
+/**
+ * This class will basically will block for the replication factor which is
+ * provided in the input map. it will block till we recive response from (DC, n)
+ * nodes.
+ */
+public class DatacenterQuorumResponseHandler<T> extends QuorumResponseHandler<T>
+{
+    private final List<InetAddress> waitList;
+    private int blockFor;
+
+    public DatacenterQuorumResponseHandler(List<InetAddress> waitList, int blockFor,
IResponseResolver<T> responseResolver)
+    throws InvalidRequestException
+    {
+        // Response is been managed by the map so the waitlist size really doesnt matter.
+        super(blockFor, responseResolver);
+        this.blockFor = blockFor;
+        this.waitList = waitList;
+    }
+
+    @Override
+    public void response(Message message)
+    {
+        if (condition_.isSignaled())
+        {
+            return;
+        }
+
+        if (waitList.contains(message.getFrom()))
+        {
+            blockFor--;
+        }
+        responses_.add(message);
+        // If done then the response count will be empty after removing
+        // everything.
+        if (blockFor <= 0)
+        {
+            condition_.signal();
+        }
+    }
+}

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java?rev=829819&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java
(added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java
Mon Oct 26 15:00:20 2009
@@ -0,0 +1,75 @@
+/**
+ *
+ */
+package org.apache.cassandra.service;
+
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.Message;
+
+/**
+ * This class will block for the replication factor which is
+ * provided in the input map. it will block till we recive response from
+ * n nodes in each of our data centers.
+ */
+public class DatacenterQuorumSyncResponseHandler<T> extends QuorumResponseHandler<T>
+{
+    private final Map<String, Integer> dcResponses = new HashMap<String, Integer>();
+    private final Map<String, Integer> responseCounts;
+
+    public DatacenterQuorumSyncResponseHandler(Map<String, Integer> responseCounts,
IResponseResolver<T> responseResolver)
+    throws InvalidRequestException
+    {
+        // Response is been managed by the map so make it 1 for the superclass.
+        super(1, responseResolver);
+        this.responseCounts = responseCounts;
+    }
+
+    @Override
+    public void response(Message message)
+    {
+        if (condition_.isSignaled())
+        {
+            return;
+        }
+        try
+        {
+            String dataCenter = DatabaseDescriptor.getEndPointSnitch().getLocation(message.getFrom());
+            Object blockFor = responseCounts.get(dataCenter);
+            // If this DC needs to be blocked then do the below.
+            if (blockFor != null)
+            {
+                Integer quorumCount = dcResponses.get(dataCenter);
+                if (quorumCount == null)
+                {
+                    // Intialize and recognize the first response
+                    dcResponses.put(dataCenter, 1);
+                }
+                else if ((Integer) blockFor > quorumCount)
+                {
+                    // recognize the consequtive responses.
+                    dcResponses.put(dataCenter, quorumCount + 1);
+                }
+                else
+                {
+                    // No need to wait on it anymore so remove it.
+                    responseCounts.remove(dataCenter);
+                }
+            }
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+        responses_.add(message);
+        // If done then the response count will be empty after removing
+        // everything.
+        if (responseCounts.isEmpty())
+        {
+            condition_.signal();
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
Mon Oct 26 15:00:20 2009
@@ -37,10 +37,10 @@
 
 public class QuorumResponseHandler<T> implements IAsyncCallback
 {
-    private static Logger logger_ = Logger.getLogger( QuorumResponseHandler.class );
-    private SimpleCondition condition_ = new SimpleCondition();
+    protected static Logger logger_ = Logger.getLogger( QuorumResponseHandler.class );
+    protected SimpleCondition condition_ = new SimpleCondition();
     private int responseCount_;
-    private List<Message> responses_ = new ArrayList<Message>();
+    protected List<Message> responses_ = new ArrayList<Message>();
     private IResponseResolver<T> responseResolver_;
     private long startTime_;
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon
Oct 26 15:00:20 2009
@@ -72,27 +72,27 @@
      */
     private static Map<InetAddress, Message> createWriteMessages(RowMutation rm, Map<InetAddress,
InetAddress> endpointMap) throws IOException
     {
-		Map<InetAddress, Message> messageMap = new HashMap<InetAddress, Message>();
-		Message message = rm.makeRowMutationMessage();
+        Map<InetAddress, Message> messageMap = new HashMap<InetAddress, Message>();
+        Message message = rm.makeRowMutationMessage();
 
-		for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet())
-		{
+        for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet())
+        {
             InetAddress target = entry.getKey();
             InetAddress hint = entry.getValue();
             if ( !target.equals(hint) )
-			{
-				Message hintedMessage = rm.makeRowMutationMessage();
-				hintedMessage.addHeader(RowMutation.HINT, hint.getAddress());
-				if (logger.isDebugEnabled())
-				    logger.debug("Sending the hint of " + hint + " to " + target);
-				messageMap.put(target, hintedMessage);
-			}
-			else
-			{
-				messageMap.put(target, message);
-			}
-		}
-		return messageMap;
+            {
+                Message hintedMessage = rm.makeRowMutationMessage();
+                hintedMessage.addHeader(RowMutation.HINT, hint.getAddress());
+                if (logger.isDebugEnabled())
+                    logger.debug("Sending the hint of " + hint + " to " + target);
+                messageMap.put(target, hintedMessage);
+            }
+            else
+            {
+                messageMap.put(target, message);
+            }
+        }
+        return messageMap;
     }
     
     /**
@@ -103,7 +103,7 @@
      * @param rm the mutation to be applied across the replicas
     */
     public static void insert(RowMutation rm)
-	{
+    {
         /*
          * Get the N nodes from storage service where the data needs to be
          * replicated
@@ -112,21 +112,21 @@
         */
 
         long startTime = System.currentTimeMillis();
-		try
-		{
+        try
+        {
             List<InetAddress> naturalEndpoints = StorageService.instance().getNaturalEndpoints(rm.key());
             // (This is the ZERO consistency level, so user doesn't care if we don't really
have N destinations available.)
-			Map<InetAddress, InetAddress> endpointMap = StorageService.instance().getHintedEndpointMap(rm.key(),
naturalEndpoints);
-			Map<InetAddress, Message> messageMap = createWriteMessages(rm, endpointMap);
-			for (Map.Entry<InetAddress, Message> entry : messageMap.entrySet())
-			{
+            Map<InetAddress, InetAddress> endpointMap = StorageService.instance().getHintedEndpointMap(rm.key(),
naturalEndpoints);
+            Map<InetAddress, Message> messageMap = createWriteMessages(rm, endpointMap);
+            for (Map.Entry<InetAddress, Message> entry : messageMap.entrySet())
+            {
                 Message message = entry.getValue();
                 InetAddress endpoint = entry.getKey();
                 if (logger.isDebugEnabled())
                     logger.debug("insert writing key " + rm.key() + " to " + message.getMessageId()
+ "@" + endpoint);
                 MessagingService.instance().sendOneWay(message, endpoint);
-			}
-		}
+            }
+        }
         catch (IOException e)
         {
             throw new RuntimeException("error inserting key " + rm.key(), e);
@@ -159,7 +159,7 @@
             {
                 throw new UnavailableException();
             }
-            QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(blockFor,
new WriteResponseResolver());
+            QuorumResponseHandler<Boolean> quorumResponseHandler = StorageService.instance().getResponseHandler(new
WriteResponseResolver(), blockFor, consistency_level);
             if (logger.isDebugEnabled())
                 logger.debug("insertBlocking writing key " + rm.key() + " to " + message.getMessageId()
+ "@[" + StringUtils.join(endpointMap.keySet(), ", ") + "]");
 
@@ -204,6 +204,7 @@
 
     private static int determineBlockFor(int naturalTargets, int hintedTargets, int consistency_level)
     {
+        // TODO this is broken for DC quorum / DC quorum sync
         int bootstrapTargets = hintedTargets - naturalTargets;
         int blockFor;
         if (consistency_level == ConsistencyLevel.ONE)
@@ -318,8 +319,8 @@
         }
         else
         {
-            assert consistency_level == ConsistencyLevel.QUORUM;
-            rows = strongRead(commands);
+            assert consistency_level >= ConsistencyLevel.QUORUM;
+            rows = strongRead(commands, consistency_level);
         }
 
         readStats.add(System.currentTimeMillis() - startTime);
@@ -339,7 +340,7 @@
          * 7. else carry out read repair by getting data from all the nodes.
         // 5. return success
      */
-    private static List<Row> strongRead(List<ReadCommand> commands) throws IOException,
TimeoutException, InvalidRequestException, UnavailableException
+    private static List<Row> strongRead(List<ReadCommand> commands, int consistency_level)
throws IOException, TimeoutException, InvalidRequestException, UnavailableException
     {
         List<QuorumResponseHandler<Row>> quorumResponseHandlers = new ArrayList<QuorumResponseHandler<Row>>();
         List<InetAddress[]> commandEndPoints = new ArrayList<InetAddress[]>();
@@ -357,7 +358,7 @@
             Message message = command.makeReadMessage();
             Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
 
-            QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(),
new ReadResponseResolver());
+            QuorumResponseHandler<Row> quorumResponseHandler = StorageService.instance().getResponseHandler(new
ReadResponseResolver(), DatabaseDescriptor.getQuorum(), consistency_level);
             InetAddress dataPoint = StorageService.instance().findSuitableEndPoint(command.key);
             List<InetAddress> endpointList = StorageService.instance().getNaturalEndpoints(command.key);
             /* Remove the local storage endpoint from the list. */

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon
Oct 26 15:00:20 2009
@@ -289,7 +289,7 @@
 
     public IEndPointSnitch getEndPointSnitch()
     {
-    	return endPointSnitch_;
+        return endPointSnitch_;
     }
     
     /*
@@ -577,10 +577,10 @@
      */
     public void takeSnapshot(String tableName, String tag) throws IOException
     {
-    	if (DatabaseDescriptor.getTable(tableName) == null)
+        if (DatabaseDescriptor.getTable(tableName) == null)
         {
             throw new IOException("Table " + tableName + "does not exist");
-    	}
+        }
         Table tableInstance = Table.open(tableName);
         tableInstance.snapshot(tag);
     }
@@ -592,11 +592,11 @@
      */
     public void takeAllSnapshot(String tag) throws IOException
     {
-    	for (String tableName: DatabaseDescriptor.getTables())
+        for (String tableName: DatabaseDescriptor.getTables())
         {
             Table tableInstance = Table.open(tableName);
             tableInstance.snapshot(tag);
-    	}
+        }
     }
 
     /**
@@ -604,11 +604,11 @@
      */
     public void clearSnapshot() throws IOException
     {
-    	for (String tableName: DatabaseDescriptor.getTables())
+        for (String tableName: DatabaseDescriptor.getTables())
         {
             Table tableInstance = Table.open(tableName);
             tableInstance.clearSnapshot();
-    	}
+        }
         if (logger_.isDebugEnabled())
             logger_.debug("Cleared out all snapshot directories");
     }
@@ -789,16 +789,16 @@
      */
     public List<InetAddress> getLiveNaturalEndpoints(String key)
     {
-    	List<InetAddress> liveEps = new ArrayList<InetAddress>();
-    	List<InetAddress> endpoints = getNaturalEndpoints(key);
-    	
-    	for ( InetAddress endpoint : endpoints )
-    	{
-    		if ( FailureDetector.instance().isAlive(endpoint) )
-    			liveEps.add(endpoint);
-    	}
-    	
-    	return liveEps;
+        List<InetAddress> liveEps = new ArrayList<InetAddress>();
+        List<InetAddress> endpoints = getNaturalEndpoints(key);
+        
+        for ( InetAddress endpoint : endpoints )
+        {
+            if ( FailureDetector.instance().isAlive(endpoint) )
+                liveEps.add(endpoint);
+        }
+        
+        return liveEps;
     }
 
     /**
@@ -817,45 +817,45 @@
      * This function finds the most suitable endpoint given a key.
      * It checks for locality and alive test.
      */
-	public InetAddress findSuitableEndPoint(String key) throws IOException, UnavailableException
-	{
-		List<InetAddress> endpoints = getNaturalEndpoints(key);
-		for(InetAddress endPoint: endpoints)
-		{
+    public InetAddress findSuitableEndPoint(String key) throws IOException, UnavailableException
+    {
+        List<InetAddress> endpoints = getNaturalEndpoints(key);
+        for(InetAddress endPoint: endpoints)
+        {
             if(endPoint.equals(FBUtilities.getLocalAddress()))
-			{
-				return endPoint;
-			}
-		}
-		int j = 0;
-		for ( ; j < endpoints.size(); ++j )
-		{
-			if ( StorageService.instance().isInSameDataCenter(endpoints.get(j)) && FailureDetector.instance().isAlive(endpoints.get(j)))
-			{
-				return endpoints.get(j);
-			}
-		}
-		// We have tried to be really nice but looks like there are no servers 
-		// in the local data center that are alive and can service this request so 
-		// just send it to the first alive guy and see if we get anything.
-		j = 0;
-		for ( ; j < endpoints.size(); ++j )
-		{
-			if ( FailureDetector.instance().isAlive(endpoints.get(j)))
-			{
-				if (logger_.isDebugEnabled())
-				  logger_.debug("InetAddress " + endpoints.get(j) + " is alive so get data from it.");
-				return endpoints.get(j);
-			}
-		}
+            {
+                return endPoint;
+            }
+        }
+        int j = 0;
+        for ( ; j < endpoints.size(); ++j )
+        {
+            if ( StorageService.instance().isInSameDataCenter(endpoints.get(j)) &&
FailureDetector.instance().isAlive(endpoints.get(j)))
+            {
+                return endpoints.get(j);
+            }
+        }
+        // We have tried to be really nice but looks like there are no servers 
+        // in the local data center that are alive and can service this request so 
+        // just send it to the first alive guy and see if we get anything.
+        j = 0;
+        for ( ; j < endpoints.size(); ++j )
+        {
+            if ( FailureDetector.instance().isAlive(endpoints.get(j)))
+            {
+                if (logger_.isDebugEnabled())
+                  logger_.debug("InetAddress " + endpoints.get(j) + " is alive so get data
from it.");
+                return endpoints.get(j);
+            }
+        }
 
         throw new UnavailableException(); // no nodes that could contain key are alive
-	}
+    }
 
-	Map<Token, InetAddress> getLiveEndPointMap()
-	{
-	    return tokenMetadata_.cloneTokenEndPointMap();
-	}
+    Map<Token, InetAddress> getLiveEndPointMap()
+    {
+        return tokenMetadata_.cloneTokenEndPointMap();
+    }
 
     public void setLog4jLevel(String classQualifier, String rawLevel)
     {
@@ -890,4 +890,9 @@
         tokens.add(range.right().toString());
         return tokens;
     }
+
+    public <T> QuorumResponseHandler<T> getResponseHandler(IResponseResolver<T>
responseResolver, int blockFor, int consistency_level) throws InvalidRequestException, UnavailableException
+    {
+        return replicationStrategy_.getResponseHandler(responseResolver, blockFor, consistency_level);
+    }
 }



Mime
View raw message