Return-Path: Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: (qmail 38311 invoked from network); 19 Jan 2011 19:53:13 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 19 Jan 2011 19:53:13 -0000 Received: (qmail 26164 invoked by uid 500); 19 Jan 2011 19:53:13 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 26140 invoked by uid 500); 19 Jan 2011 19:53:13 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 26132 invoked by uid 99); 19 Jan 2011 19:53:12 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Jan 2011 19:53:12 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Jan 2011 19:53:11 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id DE1A02388A3C; Wed, 19 Jan 2011 19:52:51 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1060938 - in /cassandra/branches/cassandra-0.7: CHANGES.txt test/distributed/org/apache/cassandra/MutationTest.java Date: Wed, 19 Jan 2011 19:52:51 -0000 To: commits@cassandra.apache.org From: jbellis@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110119195251.DE1A02388A3C@eris.apache.org> Author: jbellis Date: Wed Jan 19 19:52:51 2011 New Revision: 1060938 URL: http://svn.apache.org/viewvc?rev=1060938&view=rev Log: fix distributed-test MutationTest patch by stuhood; reviewed by Pavel Yaskevich for CASSANDRA-1964 Modified: cassandra/branches/cassandra-0.7/CHANGES.txt cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MutationTest.java Modified: cassandra/branches/cassandra-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1060938&r1=1060937&r2=1060938&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.7/CHANGES.txt Wed Jan 19 19:52:51 2011 @@ -12,7 +12,7 @@ * implement describeOwnership for BOP, COPP (CASSANDRA-1928) * make read repair behave as expected for ConsistencyLevel > ONE (CASSANDRA-982) - * distributed test harness (CASSANDRA-1859) + * distributed test harness (CASSANDRA-1859, 1964) * reduce flush lock contention (CASSANDRA-1930) * optimize supercolumn deserialization (CASSANDRA-1891) * fix CFMetaData.apply to only compare objects of the same class Modified: cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MutationTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MutationTest.java?rev=1060938&r1=1060937&r2=1060938&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MutationTest.java (original) +++ cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MutationTest.java Wed Jan 19 19:52:51 2011 @@ -26,16 +26,18 @@ import java.io.IOException; import java.io.Writer; import java.net.InetAddress; import java.nio.ByteBuffer; -import java.util.LinkedList; -import java.util.List; -import java.util.ArrayList; +import java.util.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.cassandra.thrift.*; import org.apache.cassandra.tools.NodeProbe; import org.apache.cassandra.utils.WrappedRunnable; -import org.apache.thrift.TException; +import org.apache.thrift.TException; import org.apache.cassandra.client.*; import org.apache.cassandra.dht.RandomPartitioner; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.CassandraServiceController.Failure; @@ -48,6 +50,8 @@ import static junit.framework.Assert.ass public class MutationTest extends TestBase { + private static final Logger logger = LoggerFactory.getLogger(MutationTest.class); + @Test public void testInsert() throws Exception { @@ -62,9 +66,9 @@ public class MutationTest extends TestBa insert(client, key, "Standard1", "c1", "v1", 0, ConsistencyLevel.ONE); insert(client, key, "Standard1", "c2", "v2", 0, ConsistencyLevel.ONE); - - assertColumnEqual("c1", "v1", 0, getColumn(client, key, "Standard1", "c1", ConsistencyLevel.ONE)); - assertColumnEqual("c2", "v2", 0, getColumn(client, key, "Standard1", "c2", ConsistencyLevel.ONE)); + // block until the column is available + new Get(client, "Standard1", key).name("c1").value("v1").perform(ConsistencyLevel.ONE); + new Get(client, "Standard1", key).name("c2").value("v2").perform(ConsistencyLevel.ONE); List coscs = get_slice(client, key, "Standard1", ConsistencyLevel.ONE); assertColumnEqual("c1", "v1", 0, coscs.get(0).column); @@ -84,24 +88,22 @@ public class MutationTest extends TestBa ByteBuffer key = newKey(); insert(client, key, "Standard1", "c1", "v1", 0, ConsistencyLevel.ALL); + // should be instantly available assertColumnEqual("c1", "v1", 0, getColumn(client, key, "Standard1", "c1", ConsistencyLevel.ONE)); List endpoints = endpointsForKey(hosts.get(0), key, keyspace); InetAddress coordinator = nonEndpointForKey(hosts.get(0), key, keyspace); Failure failure = controller.failHosts(endpoints.subList(1, endpoints.size())); - Thread.sleep(10000); // let gossip catch up - try { client = controller.createClient(coordinator); client.set_keyspace(keyspace); - assertColumnEqual("c1", "v1", 0, getColumn(client, key, "Standard1", "c1", ConsistencyLevel.ONE)); + new Get(client, "Standard1", key).name("c1").value("v1") + .perform(ConsistencyLevel.ONE); - insert(client, key, "Standard1", "c3", "v3", 0, ConsistencyLevel.ALL); - assert false; - } catch (UnavailableException e) { - // [this is good] + new Insert(client, "Standard1", key).name("c3").value("v3") + .expecting(UnavailableException.class).perform(ConsistencyLevel.ALL); } finally { failure.resolve(); Thread.sleep(10000); @@ -125,26 +127,21 @@ public class MutationTest extends TestBa InetAddress coordinator = nonEndpointForKey(hosts.get(0), key, keyspace); Failure failure = controller.failHosts(endpoints.subList(1, endpoints.size())); //kill all but one nodes - Thread.sleep(10000); client = controller.createClient(coordinator); client.set_keyspace(keyspace); try { - insert(client, key, "Standard1", "c1", "v1", 0, ConsistencyLevel.QUORUM); - assert false; - } catch (UnavailableException e) { - // [this is good] + new Insert(client, "Standard1", key).name("c1").value("v1") + .expecting(UnavailableException.class).perform(ConsistencyLevel.QUORUM); } finally { failure.resolve(); - Thread.sleep(10000); } // with all nodes up - insert(client, key, "Standard1", "c2", "v2", 0, ConsistencyLevel.QUORUM); + new Insert(client, "Standard1", key).name("c2").value("v2").perform(ConsistencyLevel.QUORUM); failure = controller.failHosts(endpoints.get(0)); - Thread.sleep(10000); try { - getColumn(client, key, "Standard1", "c2", ConsistencyLevel.QUORUM); + new Get(client, "Standard1", key).name("c2").value("v2").perform(ConsistencyLevel.QUORUM); } finally { failure.resolve(); Thread.sleep(10000); @@ -180,12 +177,9 @@ public class MutationTest extends TestBa // read with all (success) Failure failure = controller.failHosts(endpoints); - Thread.sleep(10000); try { - insert(client, key, "Standard1", "c2", "v2", 0, ConsistencyLevel.ONE); - assert false; - } catch (UnavailableException e) { - // this is good + new Insert(client, "Standard1", key).name("c2").value("v2") + .expecting(UnavailableException.class).perform(ConsistencyLevel.ONE); } finally { failure.resolve(); } @@ -210,6 +204,122 @@ public class MutationTest extends TestBa return client.get(key, cpath, cl).column; } + protected class Get extends RetryingAction + { + public Get(Cassandra.Client client, String cf, ByteBuffer key) + { + super(client, cf, key); + } + + public void tryPerformAction(ConsistencyLevel cl) throws Exception + { + assertColumnEqual(name, value, timestamp, getColumn(client, key, cf, name, cl)); + } + } + + protected class Insert extends RetryingAction + { + public Insert(Cassandra.Client client, String cf, ByteBuffer key) + { + super(client, cf, key); + } + + public void tryPerformAction(ConsistencyLevel cl) throws Exception + { + insert(client, key, cf, name, value, timestamp, cl); + } + } + + /** Performs an action repeatedly until timeout, success or failure. */ + protected abstract class RetryingAction + { + protected Cassandra.Client client; + protected String cf; + protected ByteBuffer key; + protected String name; + protected String value; + protected long timestamp; + + private Set> expected = new HashSet>(); + private long timeout = StorageService.RING_DELAY; + + public RetryingAction(Cassandra.Client client, String cf, ByteBuffer key) + { + this.client = client; + this.cf = cf; + this.key = key; + this.timestamp = 0; + } + + public RetryingAction name(String name) + { + this.name = name; return this; + } + + /** The value to expect for the return column, or null to expect the column to be missing. */ + public RetryingAction value(String value) + { + this.value = value; return this; + } + + /** The total time to allow before failing. */ + public RetryingAction timeout(long timeout) + { + this.timeout = timeout; return this; + } + + /** The expected timestamp of the returned column. */ + public RetryingAction timestamp(long timestamp) + { + this.timestamp = timestamp; return this; + } + + /** The exception classes that indicate success. */ + public RetryingAction expecting(Class... tempExceptions) + { + this.expected.clear(); + for (Class exclass : tempExceptions) + expected.add((Class)exclass); + return this; + } + + public void perform(ConsistencyLevel cl) throws AssertionError + { + long deadline = System.currentTimeMillis() + timeout; + int attempts = 0; + String template = "%s for " + this + " after %d attempt(s) with %d ms to spare."; + Exception e = null; + while(deadline > System.currentTimeMillis()) + { + try + { + attempts++; + tryPerformAction(cl); + logger.info(String.format(template, "Succeeded", attempts, deadline - System.currentTimeMillis())); + return; + } + catch (Exception ex) + { + e = ex; + if (!expected.contains(ex.getClass())) + continue; + logger.info(String.format(template, "Caught expected exception: " + e, attempts, deadline - System.currentTimeMillis())); + return; + } + } + String err = String.format(template, "Caught unexpected: " + e, attempts, deadline - System.currentTimeMillis()); + logger.error(err); + throw new AssertionError(err); + } + + public String toString() + { + return this.getClass() + "(" + key + "," + name + ")"; + } + + protected abstract void tryPerformAction(ConsistencyLevel cl) throws Exception; + } + protected List get_slice(Cassandra.Client client, ByteBuffer key, String cf, ConsistencyLevel cl) throws InvalidRequestException, UnavailableException, TimedOutException, TException {