cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r930272 - in /cassandra/trunk: CHANGES.txt conf/storage-conf.xml src/java/org/apache/cassandra/config/CFMetaData.java src/java/org/apache/cassandra/config/DatabaseDescriptor.java src/java/org/apache/cassandra/service/StorageProxy.java
Date Fri, 02 Apr 2010 14:05:32 GMT
Author: jbellis
Date: Fri Apr  2 14:05:32 2010
New Revision: 930272

URL: http://svn.apache.org/viewvc?rev=930272&view=rev
Log:
add ReadRepairChance to CF definition.  patch by Matthew Dennis; reviewed by jbellis for CASSANDRA-930

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/conf/storage-conf.xml
    cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=930272&r1=930271&r2=930272&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Apr  2 14:05:32 2010
@@ -4,6 +4,7 @@ dev
  * switched to slf4j logging (CASSANDRA-625)
  * drain method to flush memtables and purge commit log prior to shutdown. (CASSANDRA-880)
  * access levels for authentication/authorization (CASSANDRA-900)
+ * add ReadRepairChance to CF definition (CASSANDRA-930)
 
 
 0.6.1

Modified: cassandra/trunk/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/storage-conf.xml?rev=930272&r1=930271&r2=930272&view=diff
==============================================================================
--- cassandra/trunk/conf/storage-conf.xml (original)
+++ cassandra/trunk/conf/storage-conf.xml Fri Apr  2 14:05:32 2010
@@ -82,6 +82,10 @@
        ~
        ~ An optional `Comment` attribute may be used to attach additional
        ~ human-readable information about the column family to its definition.
+       ~
+       ~ The optional ReadRepairChance attribute (from 0 to 1) specifies
+       ~ the probability with which read repairs should be invoked on non-quorum
+       ~ reads.  The default is 1.0 (always read repair).
        ~ 
        ~ The optional KeysCached attribute specifies
        ~ the number of keys per sstable whose locations we keep in
@@ -103,6 +107,7 @@
       <ColumnFamily Name="Standard1" CompareWith="BytesType"/>
       <ColumnFamily Name="Standard2" 
                     CompareWith="UTF8Type"
+                    ReadRepairChance="0.1"
                     KeysCached="100%"/>
       <ColumnFamily Name="StandardByUUID1" CompareWith="TimeUUIDType" />
       <ColumnFamily Name="Super1"

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=930272&r1=930271&r2=930272&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Fri Apr  2 14:05:32
2010
@@ -40,6 +40,7 @@ import org.apache.commons.lang.ObjectUti
 
 public final class CFMetaData
 {
+    public final static double DEFAULT_READ_REPAIR_CHANCE = 1.0;
     public final static double DEFAULT_KEY_CACHE_SIZE = 200000;
     public final static double DEFAULT_ROW_CACHE_SIZE = 0.0;
 
@@ -74,9 +75,11 @@ public final class CFMetaData
     public final String comment; // for humans only
     public final double rowCacheSize; // default 0
     public final double keyCacheSize; // default 0.01
+    public final double readRepairChance; //chance 0 to 1, of doing a read repair; defaults
1.0 (always)
     public final int cfId;
 
-    private CFMetaData(String tableName, String cfName, String columnType, AbstractType comparator,
AbstractType subcolumnComparator, String comment, double rowCacheSize, double keyCacheSize,
int cfId)
+
+    private CFMetaData(String tableName, String cfName, String columnType, AbstractType comparator,
AbstractType subcolumnComparator, String comment, double rowCacheSize, double keyCacheSize,
double readRepairChance, int cfId)
     {
         this.tableName = tableName;
         this.cfName = cfName;
@@ -86,6 +89,7 @@ public final class CFMetaData
         this.comment = comment;
         this.rowCacheSize = rowCacheSize;
         this.keyCacheSize = keyCacheSize;
+        this.readRepairChance = readRepairChance;
         this.cfId = cfId;
         currentCfNames.put(cfId, cfName);
         cfIdMap.put(new Pair<String, String>(tableName, cfName), cfId);
@@ -93,21 +97,26 @@ public final class CFMetaData
     
     public CFMetaData(String tableName, String cfName, String columnType, AbstractType comparator,
AbstractType subcolumnComparator, String comment, double rowCacheSize, double keyCacheSize)
     {
-        this(tableName, cfName, columnType, comparator, subcolumnComparator, comment, rowCacheSize,
keyCacheSize, nextId());
+        this(tableName, cfName, columnType, comparator, subcolumnComparator, comment, rowCacheSize,
keyCacheSize, DEFAULT_READ_REPAIR_CHANCE, nextId());
     }
-    
+
+    public CFMetaData(String tableName, String cfName, String columnType, AbstractType comparator,
AbstractType subcolumnComparator, String comment, double rowCacheSize, double keyCacheSize,
double readRepairChance)
+    {
+        this(tableName, cfName, columnType, comparator, subcolumnComparator, comment, rowCacheSize,
keyCacheSize, readRepairChance, nextId());
+    }
+
     /** clones an existing CFMetaData using the same id. */
     public static CFMetaData rename(CFMetaData cfm, String newName)
     {
         purge(cfm);
-        return new CFMetaData(cfm.tableName, newName, cfm.columnType, cfm.comparator, cfm.subcolumnComparator,
cfm.comment, cfm.rowCacheSize, cfm.keyCacheSize, cfm.cfId);
+        return new CFMetaData(cfm.tableName, newName, cfm.columnType, cfm.comparator, cfm.subcolumnComparator,
cfm.comment, cfm.rowCacheSize, cfm.keyCacheSize, cfm.readRepairChance, cfm.cfId);
     }
     
     /** clones existing CFMetaData. keeps the id but changes the table name.*/
     public static CFMetaData renameTable(CFMetaData cfm, String tableName)
     {
         purge(cfm);
-        return new CFMetaData(tableName, cfm.cfName, cfm.columnType, cfm.comparator, cfm.subcolumnComparator,
cfm.comment, cfm.rowCacheSize, cfm.keyCacheSize, cfm.cfId);
+        return new CFMetaData(tableName, cfm.cfName, cfm.columnType, cfm.comparator, cfm.subcolumnComparator,
cfm.comment, cfm.rowCacheSize, cfm.keyCacheSize, cfm.readRepairChance, cfm.cfId);
     }
     
     /** used for evicting cf data out of static tracking collections. */
@@ -141,6 +150,7 @@ public final class CFMetaData
             dout.writeUTF(cfm.comment);
         dout.writeDouble(cfm.rowCacheSize);
         dout.writeDouble(cfm.keyCacheSize);
+        dout.writeDouble(cfm.readRepairChance);
         dout.writeInt(cfm.cfId);
         dout.close();
         return bout.toByteArray();
@@ -173,10 +183,11 @@ public final class CFMetaData
         String comment = din.readBoolean() ? din.readUTF() : null;
         double rowCacheSize = din.readDouble();
         double keyCacheSize = din.readDouble();
+        double readRepairChance = din.readDouble();
         int cfId = din.readInt();
-        return new CFMetaData(tableName, cfName, columnType, comparator, subcolumnComparator,
comment, rowCacheSize, keyCacheSize, cfId);
+        return new CFMetaData(tableName, cfName, columnType, comparator, subcolumnComparator,
comment, rowCacheSize, keyCacheSize, readRepairChance, cfId);
     }
-
+    
     public boolean equals(Object obj)
     {
         if (!(obj instanceof CFMetaData))
@@ -190,6 +201,7 @@ public final class CFMetaData
                 && ObjectUtils.equals(other.comment, comment)
                 && other.rowCacheSize == rowCacheSize
                 && other.keyCacheSize == keyCacheSize
+                && other.readRepairChance == readRepairChance
                 && other.cfId == cfId;
     }
     

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=930272&r1=930271&r2=930272&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Apr 
2 14:05:32 2010
@@ -105,12 +105,6 @@ public class DatabaseDescriptor
     private static int memtableThroughput = 64;
     /* Number of objects in millions in the memtable before it is dumped */
     private static double memtableOperations = 0.1;
-    /* 
-     * This parameter enables or disables consistency checks. 
-     * If set to false the read repairs are disable for very
-     * high throughput on reads but at the cost of consistency.
-    */
-    private static boolean doConsistencyCheck = true;
     /* Job Jar Location */
     private static String jobJarFileLocation;
     /* Address where to run the job tracker */
@@ -429,13 +423,6 @@ public class DatabaseDescriptor
                 throw new ConfigurationException("Memtable object count must be a positive
double");
             }
 
-            /* This parameter enables or disables consistency checks.
-             * If set to false the read repairs are disable for very
-             * high throughput on reads but at the cost of consistency.*/
-            String doConsistency = xmlUtils.getNodeValue("/Storage/DoConsistencyChecksBoolean");
-            if ( doConsistency != null )
-                doConsistencyCheck = Boolean.parseBoolean(doConsistency);
-
             /* read the size at which we should do column indexes */
             String columnIndexSize = xmlUtils.getNodeValue("/Storage/ColumnIndexSizeInKB");
             if(columnIndexSize == null)
@@ -705,12 +692,22 @@ public class DatabaseDescriptor
                         rowCacheSize = FBUtilities.parseDoubleOrPercent(value);
                     }
 
+                    double readRepairChance = CFMetaData.DEFAULT_READ_REPAIR_CHANCE;
+                    if ((value = XMLUtils.getAttributeValue(columnFamily, "ReadRepairChance"))
!= null)
+                    {
+                        readRepairChance = FBUtilities.parseDoubleOrPercent(value);
+                        if (readRepairChance < 0.0 || readRepairChance > 1.0)
+                        {                        
+                            throw new ConfigurationException("ReadRepairChance must be between
0.0 and 1.0");
+                        }
+                    }
+
                     // Parse out user-specified logical names for the various dimensions
                     // of a the column family from the config.
                     String comment = xmlUtils.getNodeValue(xqlCF + "Comment");
 
                     // insert it into the table dictionary.
-                    cfDefs[j] = new CFMetaData(tableName, cfName, columnType, comparator,
subcolumnComparator, comment, rowCacheSize, keyCacheSize);
+                    cfDefs[j] = new CFMetaData(tableName, cfName, columnType, comparator,
subcolumnComparator, comment, rowCacheSize, keyCacheSize, readRepairChance);
                 }
 
                 KSMetaData meta = new KSMetaData(ksName, strategyClass, replicationFactor,
snitch, cfDefs);
@@ -878,11 +875,6 @@ public class DatabaseDescriptor
       return memtableOperations;
     }
 
-    public static boolean getConsistencyCheck()
-    {
-      return doConsistencyCheck;
-    }
-
     public static String getClusterName()
     {
         return clusterName;

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=930272&r1=930271&r2=930272&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Fri Apr  2 14:05:32
2010
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeoutExcep
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.commons.lang.ArrayUtils;
@@ -61,6 +62,7 @@ public class StorageProxy implements Sto
 {
     private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
 
+    private static final Random random = new Random();
     // mbean stuff
     private static final LatencyTracker readStats = new LatencyTracker();
     private static final LatencyTracker rangeStats = new LatencyTracker();
@@ -357,7 +359,7 @@ public class StorageProxy implements Sto
 
             if (logger.isDebugEnabled())
                 logger.debug("weakreadremote reading " + command + " from " + message.getMessageId()
+ "@" + endPoint);
-            if (DatabaseDescriptor.getConsistencyCheck())
+            if (randomlyReadRepair(command))
                 message.setHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
             iars.add(MessagingService.instance.sendRR(message, endPoint));
         }
@@ -491,7 +493,7 @@ public class StorageProxy implements Sto
             }
             catch (DigestMismatchException ex)
             {
-                if (DatabaseDescriptor.getConsistencyCheck())
+                if (randomlyReadRepair(command))
                 {
                     IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver(command.table,
DatabaseDescriptor.getQuorum(command.table));
                     QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
@@ -697,6 +699,12 @@ public class StorageProxy implements Sto
         }
         return ranges;
     }
+    
+    private static boolean randomlyReadRepair(ReadCommand command)
+    {
+        CFMetaData cfmd = DatabaseDescriptor.getTableMetaData(command.table).get(command.getColumnFamilyName());
+        return cfmd.readRepairChance > random.nextDouble();
+    }
 
     public long getReadOperations()
     {
@@ -761,7 +769,7 @@ public class StorageProxy implements Sto
             Row row = command.getRow(table);
 
             // Do the consistency checks in the background
-            if (DatabaseDescriptor.getConsistencyCheck())
+            if (randomlyReadRepair(command))
             {
                 List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table,
command.key);
                 if (endpoints.size() > 1)



Mime
View raw message