cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1060939 - in /cassandra/trunk: ./ conf/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/streaming/ test/distributed/org/apache/cassandra/
Date Wed, 19 Jan 2011 19:54:54 GMT
Author: jbellis
Date: Wed Jan 19 19:54:53 2011
New Revision: 1060939

URL: http://svn.apache.org/viewvc?rev=1060939&view=rev
Log:
merge from 0.7

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/conf/cassandra-env.sh
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java 
 (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props
changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
  (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
  (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
  (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
    cassandra/trunk/test/distributed/org/apache/cassandra/MutationTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 19 19:54:53 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7:1026516-1060910
+/cassandra/branches/cassandra-0.7:1026516-1060938
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1060939&r1=1060938&r2=1060939&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Jan 19 19:54:53 2011
@@ -20,7 +20,7 @@
  * implement describeOwnership for BOP, COPP (CASSANDRA-1928)
  * make read repair behave as expected for ConsistencyLevel > ONE
    (CASSANDRA-982)
- * distributed test harness (CASSANDRA-1859)
+ * distributed test harness (CASSANDRA-1859, 1964)
  * reduce flush lock contention (CASSANDRA-1930)
  * optimize supercolumn deserialization (CASSANDRA-1891)
  * fix CFMetaData.apply to only compare objects of the same class 
@@ -33,6 +33,8 @@
  * fixes for contrib/javautils (CASSANDRA-1979)
  * check more frequently for memtable expiration (CASSANDRA-2000)
  * fix writing SSTable column count statistics (CASSANDRA-1976)
+ * fix streaming of multiple CFs during bootstrap (CASSANDRA-1992)
+ * explicitly set JVM GC new generation size with -Xmn (CASSANDRA-1968)
 
 
 0.7.0-final

Modified: cassandra/trunk/conf/cassandra-env.sh
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra-env.sh?rev=1060939&r1=1060938&r2=1060939&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra-env.sh (original)
+++ cassandra/trunk/conf/cassandra-env.sh Wed Jan 19 19:54:53 2011
@@ -19,25 +19,46 @@ calculate_heap_size()
     case "`uname`" in
         Linux)
             system_memory_in_mb=`free -m | awk '/Mem:/ {print $2}'`
-            MAX_HEAP_SIZE=$((system_memory_in_mb / 2))M
-            return 0
+            system_cpu_cores=`cat /proc/cpuinfo | egrep '^processor(\s|\t)+:.*' | wc -l`
+            break
         ;;
         FreeBSD)
             system_memory_in_bytes=`sysctl hw.physmem | awk '{print $2}'`
-            MAX_HEAP_SIZE=$((system_memory_in_bytes / 1024 / 1024 / 2))M
-            return 0
+            system_memory_in_mb=$((system_memory_in_bytes / 1024 / 1024))
+            system_cpu_cores=`sysctl hw.ncpu | awk '{print $2}'`
+            break
         ;;
         *)
             MAX_HEAP_SIZE=1024M
+            HEAP_NEWSIZE=256M
             return 1
         ;;
     esac
+    max_heap_size_in_mb=$((system_memory_in_mb / 2))
+    MAX_HEAP_SIZE=${max_heap_size_in_mb}M
+
+    # Young gen: min(max_sensible_per_modern_cpu_core * num_cores, 1/4 * heap size)
+    max_sensible_yg_per_core_in_mb="100"
+    max_sensible_yg_in_mb=$((max_sensible_yg_per_core_in_mb * system_cpu_cores))
+
+    desired_yg_in_mb=$((max_heap_size_in_mb / 4))
+
+    if [ "$desired_yg_in_mb" -gt "$max_sensible_yg_in_mb" ]
+    then
+        HEAP_NEWSIZE="${max_sensible_yg_in_mb}M"
+    else
+        HEAP_NEWSIZE="${desired_yg_in_mb}M"
+    fi
+
+    return 0
 }
 
 # The amount of memory to allocate to the JVM at startup, you almost
 # certainly want to adjust this for your environment. If left commented
 # out, the heap size will be automatically determined by calculate_heap_size
 # MAX_HEAP_SIZE="4G"
+# set this to explicity control the size of the young generation
+# HEAP_NEWSIZE="1G"
 
 if [ "x$MAX_HEAP_SIZE" = "x" ]; then
     calculate_heap_size
@@ -68,6 +89,7 @@ JVM_OPTS="$JVM_OPTS -XX:ThreadPriorityPo
 # out.
 JVM_OPTS="$JVM_OPTS -Xms$MAX_HEAP_SIZE"
 JVM_OPTS="$JVM_OPTS -Xmx$MAX_HEAP_SIZE"
+JVM_OPTS="$JVM_OPTS -Xmn$HEAP_NEWSIZE"
 JVM_OPTS="$JVM_OPTS -XX:+HeapDumpOnOutOfMemoryError" 
 
 if [ "`uname`" = "Linux" ] ; then

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 19 19:54:53 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1060910
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1060938
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 19 19:54:53 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1060910
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1060938
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 19 19:54:53 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1060910
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1060938
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 19 19:54:53 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1060910
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1060938
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 19 19:54:53 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1060910
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1060938
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1060939&r1=1060938&r2=1060939&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Jan 19 19:54:53
2011
@@ -923,6 +923,7 @@ public class ColumnFamilyStore implement
      */
     public void addSSTable(SSTableReader sstable)
     {
+        assert sstable.getColumnFamilyName().equals(columnFamily);
         ssTables.add(Arrays.asList(sstable));
         CompactionManager.instance.submitMinorIfNeeded(this);
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1060939&r1=1060938&r2=1060939&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java Wed Jan 19
19:54:53 2011
@@ -48,7 +48,6 @@ public class StreamInSession
     private final Runnable callback;
     private String table;
     private final List<Future<SSTableReader>> buildFutures = new ArrayList<Future<SSTableReader>>();
-    private ColumnFamilyStore cfs;
     private PendingFile current;
 
     private StreamInSession(Pair<InetAddress, Long> context, Runnable callback)
@@ -92,13 +91,11 @@ public class StreamInSession
 
     public void addFiles(Collection<PendingFile> files)
     {
-        for(PendingFile file : files)
+        for (PendingFile file : files)
         {
             if(logger.isDebugEnabled())
                 logger.debug("Adding file {} to Stream Request queue", file.getFilename());
             this.files.add(file);
-            if (cfs == null)
-                cfs = Table.open(file.desc.ksname).getColumnFamilyStore(file.desc.cfname);
         }
     }
 
@@ -130,16 +127,20 @@ public class StreamInSession
         if (files.isEmpty())
         {
             // wait for bloom filters and row indexes to finish building
-            List<SSTableReader> sstables = new ArrayList<SSTableReader>(buildFutures.size());
+            HashMap <ColumnFamilyStore, List<SSTableReader>> cfstores = new HashMap<ColumnFamilyStore,
List<SSTableReader>>();
             for (Future<SSTableReader> future : buildFutures)
             {
                 try
                 {
                     SSTableReader sstable = future.get();
+                    assert sstable.getTableName().equals(table);
                     if (sstable == null)
                         continue;
+                    ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
                     cfs.addSSTable(sstable);
-                    sstables.add(sstable);
+                    if (!cfstores.containsKey(cfs))
+                        cfstores.put(cfs, new ArrayList<SSTableReader>());
+                    cfstores.get(cfs).add(sstable);
                 }
                 catch (InterruptedException e)
                 {
@@ -152,8 +153,11 @@ public class StreamInSession
             }
 
             // build secondary indexes
-            if (cfs != null && !cfs.getIndexedColumns().isEmpty())
-                cfs.buildSecondaryIndexes(sstables, cfs.getIndexedColumns());
+            for (Map.Entry<ColumnFamilyStore, List<SSTableReader>> entry : cfstores.entrySet())
+            {
+                if (entry.getKey() != null && !entry.getKey().getIndexedColumns().isEmpty())
+                    entry.getKey().buildSecondaryIndexes(entry.getValue(), entry.getKey().getIndexedColumns());
+            }
 
             // send reply to source that we're done
             StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FINISHED);

Modified: cassandra/trunk/test/distributed/org/apache/cassandra/MutationTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/distributed/org/apache/cassandra/MutationTest.java?rev=1060939&r1=1060938&r2=1060939&view=diff
==============================================================================
--- cassandra/trunk/test/distributed/org/apache/cassandra/MutationTest.java (original)
+++ cassandra/trunk/test/distributed/org/apache/cassandra/MutationTest.java Wed Jan 19 19:54:53
2011
@@ -26,16 +26,18 @@ import java.io.IOException;
 import java.io.Writer;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ArrayList;
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.tools.NodeProbe;
 import org.apache.cassandra.utils.WrappedRunnable;
-import  org.apache.thrift.TException;
+import org.apache.thrift.TException;
 import org.apache.cassandra.client.*;
 import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.service.StorageService;
 
 import org.apache.cassandra.CassandraServiceController.Failure;
 
@@ -48,6 +50,8 @@ import static junit.framework.Assert.ass
 
 public class MutationTest extends TestBase
 {
+    private static final Logger logger = LoggerFactory.getLogger(MutationTest.class);
+
     @Test
     public void testInsert() throws Exception
     {
@@ -62,9 +66,9 @@ public class MutationTest extends TestBa
         insert(client, key, "Standard1", "c1", "v1", 0, ConsistencyLevel.ONE);
         insert(client, key, "Standard1", "c2", "v2", 0, ConsistencyLevel.ONE);
 
-
-        assertColumnEqual("c1", "v1", 0, getColumn(client, key, "Standard1", "c1", ConsistencyLevel.ONE));
-        assertColumnEqual("c2", "v2", 0, getColumn(client, key, "Standard1", "c2", ConsistencyLevel.ONE));
+        // block until the column is available
+        new Get(client, "Standard1", key).name("c1").value("v1").perform(ConsistencyLevel.ONE);
+        new Get(client, "Standard1", key).name("c2").value("v2").perform(ConsistencyLevel.ONE);
 
         List<ColumnOrSuperColumn> coscs = get_slice(client, key, "Standard1", ConsistencyLevel.ONE);
         assertColumnEqual("c1", "v1", 0, coscs.get(0).column);
@@ -84,24 +88,22 @@ public class MutationTest extends TestBa
         ByteBuffer key = newKey();
 
         insert(client, key, "Standard1", "c1", "v1", 0, ConsistencyLevel.ALL);
+        // should be instantly available
         assertColumnEqual("c1", "v1", 0, getColumn(client, key, "Standard1", "c1", ConsistencyLevel.ONE));
 
         List<InetAddress> endpoints = endpointsForKey(hosts.get(0), key, keyspace);
         InetAddress coordinator = nonEndpointForKey(hosts.get(0), key, keyspace);
         Failure failure = controller.failHosts(endpoints.subList(1, endpoints.size()));
 
-        Thread.sleep(10000); // let gossip catch up
-
         try {
             client = controller.createClient(coordinator);
             client.set_keyspace(keyspace);
 
-            assertColumnEqual("c1", "v1", 0, getColumn(client, key, "Standard1", "c1", ConsistencyLevel.ONE));
+            new Get(client, "Standard1", key).name("c1").value("v1")
+                .perform(ConsistencyLevel.ONE);
 
-            insert(client, key, "Standard1", "c3", "v3", 0, ConsistencyLevel.ALL);
-            assert false;
-        } catch (UnavailableException e) {
-            // [this is good]
+            new Insert(client, "Standard1", key).name("c3").value("v3")
+                .expecting(UnavailableException.class).perform(ConsistencyLevel.ALL);
         } finally {
             failure.resolve();
             Thread.sleep(10000);
@@ -125,26 +127,21 @@ public class MutationTest extends TestBa
         InetAddress coordinator = nonEndpointForKey(hosts.get(0), key, keyspace);
         Failure failure = controller.failHosts(endpoints.subList(1, endpoints.size())); //kill
all but one nodes
 
-        Thread.sleep(10000);
         client = controller.createClient(coordinator);
         client.set_keyspace(keyspace);
         try {
-            insert(client, key, "Standard1", "c1", "v1", 0, ConsistencyLevel.QUORUM);
-            assert false;
-        } catch (UnavailableException e) {
-            // [this is good]
+            new Insert(client, "Standard1", key).name("c1").value("v1")
+                .expecting(UnavailableException.class).perform(ConsistencyLevel.QUORUM);
         } finally {
             failure.resolve();
-            Thread.sleep(10000);
         }
 
         // with all nodes up
-        insert(client, key, "Standard1", "c2", "v2", 0, ConsistencyLevel.QUORUM);
+        new Insert(client, "Standard1", key).name("c2").value("v2").perform(ConsistencyLevel.QUORUM);
 
         failure = controller.failHosts(endpoints.get(0));
-        Thread.sleep(10000);
         try {
-            getColumn(client, key, "Standard1", "c2", ConsistencyLevel.QUORUM);
+            new Get(client, "Standard1", key).name("c2").value("v2").perform(ConsistencyLevel.QUORUM);
         } finally {
             failure.resolve();
             Thread.sleep(10000);
@@ -180,12 +177,9 @@ public class MutationTest extends TestBa
             // read with all (success)
 
         Failure failure = controller.failHosts(endpoints);
-        Thread.sleep(10000);
         try {
-            insert(client, key, "Standard1", "c2", "v2", 0, ConsistencyLevel.ONE);
-            assert false;
-        } catch (UnavailableException e) {
-            // this is good
+            new Insert(client, "Standard1", key).name("c2").value("v2")
+                .expecting(UnavailableException.class).perform(ConsistencyLevel.ONE);
         } finally {
             failure.resolve();
         }
@@ -210,6 +204,122 @@ public class MutationTest extends TestBa
         return client.get(key, cpath, cl).column;
     }
 
+    protected class Get extends RetryingAction
+    {
+        public Get(Cassandra.Client client, String cf, ByteBuffer key)
+        {
+            super(client, cf, key);
+        }
+
+        public void tryPerformAction(ConsistencyLevel cl) throws Exception
+        {
+            assertColumnEqual(name, value, timestamp, getColumn(client, key, cf, name, cl));
+        }
+    }
+
+    protected class Insert extends RetryingAction
+    {
+        public Insert(Cassandra.Client client, String cf, ByteBuffer key)
+        {
+            super(client, cf, key);
+        }
+
+        public void tryPerformAction(ConsistencyLevel cl) throws Exception
+        {
+            insert(client, key, cf, name, value, timestamp, cl);
+        }
+    }
+
+    /** Performs an action repeatedly until timeout, success or failure. */
+    protected abstract class RetryingAction
+    {
+        protected Cassandra.Client client;
+        protected String cf;
+        protected ByteBuffer key;
+        protected String name;
+        protected String value;
+        protected long timestamp;
+
+        private Set<Class<Exception>> expected = new HashSet<Class<Exception>>();
+        private long timeout = StorageService.RING_DELAY;
+
+        public RetryingAction(Cassandra.Client client, String cf, ByteBuffer key)
+        {
+            this.client = client;
+            this.cf = cf;
+            this.key = key;
+            this.timestamp = 0;
+        }
+
+        public RetryingAction name(String name)
+        {
+            this.name = name; return this;
+        }
+
+        /** The value to expect for the return column, or null to expect the column to be
missing. */
+        public RetryingAction value(String value)
+        {
+            this.value = value; return this;
+        }
+        
+        /** The total time to allow before failing. */
+        public RetryingAction timeout(long timeout)
+        {
+            this.timeout = timeout; return this;
+        }
+
+        /** The expected timestamp of the returned column. */
+        public RetryingAction timestamp(long timestamp)
+        {
+            this.timestamp = timestamp; return this;
+        }
+
+        /** The exception classes that indicate success. */
+        public RetryingAction expecting(Class... tempExceptions)
+        {
+            this.expected.clear();
+            for (Class exclass : tempExceptions)
+                expected.add((Class<Exception>)exclass);
+            return this;
+        }
+
+        public void perform(ConsistencyLevel cl) throws AssertionError
+        {
+            long deadline = System.currentTimeMillis() + timeout;
+            int attempts = 0;
+            String template = "%s for " + this + " after %d attempt(s) with %d ms to spare.";
+            Exception e = null;
+            while(deadline > System.currentTimeMillis())
+            {
+                try
+                {
+                    attempts++;
+                    tryPerformAction(cl);
+                    logger.info(String.format(template, "Succeeded", attempts, deadline -
System.currentTimeMillis()));
+                    return;
+                }
+                catch (Exception ex)
+                {
+                    e = ex;
+                    if (!expected.contains(ex.getClass()))
+                        continue;
+                    logger.info(String.format(template, "Caught expected exception: " + e,
attempts, deadline - System.currentTimeMillis()));
+                    return;
+                }
+            }
+            String err = String.format(template, "Caught unexpected: " + e, attempts, deadline
- System.currentTimeMillis());
+            logger.error(err);
+            throw new AssertionError(err);
+        }
+        
+        public String toString()
+        {
+            return this.getClass() + "(" + key + "," + name + ")";
+        }
+
+        protected abstract void tryPerformAction(ConsistencyLevel cl) throws Exception;
+    }
+
     protected List<ColumnOrSuperColumn> get_slice(Cassandra.Client client, ByteBuffer
key, String cf, ConsistencyLevel cl)
       throws InvalidRequestException, UnavailableException, TimedOutException, TException
     {



Mime
View raw message