Return-Path: Delivered-To: apmail-incubator-cassandra-commits-archive@minotaur.apache.org Received: (qmail 40099 invoked from network); 19 Dec 2009 08:04:13 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 19 Dec 2009 08:04:13 -0000 Received: (qmail 72869 invoked by uid 500); 19 Dec 2009 08:04:13 -0000 Delivered-To: apmail-incubator-cassandra-commits-archive@incubator.apache.org Received: (qmail 72811 invoked by uid 500); 19 Dec 2009 08:04:13 -0000 Mailing-List: contact cassandra-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: cassandra-dev@incubator.apache.org Delivered-To: mailing list cassandra-commits@incubator.apache.org Received: (qmail 72801 invoked by uid 99); 19 Dec 2009 08:04:13 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 19 Dec 2009 08:04:13 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 19 Dec 2009 08:04:03 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id CCC8B2388978; Sat, 19 Dec 2009 08:03:41 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r892450 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/utils/ test/unit/org/apache/cassandra/config/ test/unit/or... Date: Sat, 19 Dec 2009 08:03:41 -0000 To: cassandra-commits@incubator.apache.org From: goffinet@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091219080341.CCC8B2388978@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: goffinet Date: Sat Dec 19 08:03:40 2009 New Revision: 892450 URL: http://svn.apache.org/viewvc?rev=892450&view=rev Log: Repair should never reuse a tree. AEService currently 'caches' MerkleTrees that have been generated by the local node, and can respond to a request for a tree with a cached version. patch by stuhood; reviewed by junaro for CASSNADRA-640 Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=892450&r1=892449&r2=892450&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Sat Dec 19 08:03:40 2009 @@ -989,4 +989,12 @@ { return autoBootstrap_; } + + /** + * For testing purposes. + */ + static void setReplicationFactorUnsafe(int factor) + { + replicationFactor_ = factor; + } } Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=892450&r1=892449&r2=892450&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Sat Dec 19 08:03:40 2009 @@ -918,8 +918,7 @@ writer = new SSTableWriter(newFilename, expectedBloomFilterSize, StorageService.getPartitioner()); // validate the CF as we iterate over it - InetAddress initiator = major ? FBUtilities.getLocalAddress() : null; - AntiEntropyService.IValidator validator = AntiEntropyService.instance().getValidator(table_, columnFamily_, initiator); + AntiEntropyService.IValidator validator = AntiEntropyService.instance().getValidator(table_, columnFamily_, null, major); validator.prepare(); while (nni.hasNext()) { @@ -983,7 +982,7 @@ Iterator nni = new FilterIterator(ci, PredicateUtils.notNullPredicate()); // validate the CF as we iterate over it - AntiEntropyService.IValidator validator = AntiEntropyService.instance().getValidator(table_, columnFamily_, initiator); + AntiEntropyService.IValidator validator = AntiEntropyService.instance().getValidator(table_, columnFamily_, initiator, true); validator.prepare(); while (nni.hasNext()) { Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=892450&r1=892449&r2=892450&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Sat Dec 19 08:03:40 2009 @@ -41,16 +41,13 @@ import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.utils.Cachetable; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.*; import org.apache.log4j.Logger; import com.google.common.collect.Collections2; import com.google.common.base.Predicate; import com.google.common.base.Predicates; -import com.reardencommerce.kernel.collections.shared.evictable.ConcurrentLinkedHashMap; /** * AntiEntropyService encapsulates "validating" (hashing) individual column families, @@ -58,7 +55,7 @@ * and then triggering repairs for disagreeing ranges. * * Every Tree conversation has an 'initiator', where valid trees are sent after generation - * and where the local and remote tree will rendezvous in register(cf, endpoint, tree). + * and where the local and remote tree will rendezvous in rendezvous(cf, endpoint, tree). * Once the trees rendezvous, a Differencer is executed and the service can trigger repairs * for disagreeing ranges. * @@ -71,7 +68,7 @@ * the column family. * * Automatic compactions will also validate a column family and broadcast TreeResponses, but * since TreeRequest messages are not sent to neighboring nodes, repairs will only occur if two - * nodes happen to perform automatic compactions within TREE_CACHE_LIFETIME of one another. + * nodes happen to perform automatic compactions within TREE_STORE_TIMEOUT of one another. * 2. The compaction process validates the column family by: * * Calling getValidator(), which can return a NoopValidator if validation should not be performed, * * Calling IValidator.prepare(), which samples the column family to determine key distribution, @@ -80,7 +77,7 @@ * * If getValidator decided that the column family should be validated, calling complete() * indicates that a valid MerkleTree has been created for the column family. * * The valid tree is broadcast to neighboring nodes via TreeResponse, and stored locally. - * 3. When a node receives a TreeResponse, it passes the tree to register(), which checks for trees to + * 3. When a node receives a TreeResponse, it passes the tree to rendezvous(), which checks for trees to * rendezvous with / compare to: * * If the tree is local, it is cached, and compared to any trees that were received from neighbors. * * If the tree is remote, it is immediately compared to a local tree if one is cached. Otherwise, @@ -89,13 +86,6 @@ * 4. Differencers are executed in AE_SERVICE_STAGE, to compare the two trees. * * Based on the fraction of disagreement between the trees, the differencer will * either perform repair via the io.Streaming api, or via RangeCommand read repairs. - * 5. TODO: Because a local tree is stored for TREE_CACHE_LIFETIME, it is possible to perform - * redundant repairs when repairs are triggered manually. Because of the SSTable architecture, - * this doesn't cause any problems except excess data transfer, but: - * * One possible solution is to maintain the local tree in memory by invalidating ranges when they - * change, and only performing partial compactions/validations. - * * Another would be to only communicate with one neighbor at a time, meaning that an additional - * compaction is required for every neighbor. */ public class AntiEntropyService { @@ -105,26 +95,36 @@ public final static String TREE_REQUEST_VERB = "TREE-REQUEST-VERB"; public final static String TREE_RESPONSE_VERB = "TREE-RESPONSE-VERB"; - // millisecond lifetime to store remote trees before they become stale - public final static long TREE_CACHE_LIFETIME = 600000; + // millisecond lifetime to store trees before they become stale + public final static long TREE_STORE_TIMEOUT = 600000; + // max millisecond frequency that natural (automatic) repairs should run at + public final static long NATURAL_REPAIR_FREQUENCY = 3600000; // singleton enforcement private static volatile AntiEntropyService aeService; /** - * Map of endpoints to recently generated trees for their column families. - * Remote trees are removed from the map once they have been compared to - * local trees, but local trees are cached for multiple comparisons. + * Map of CFPair to timestamp of the beginning of the last natural repair. */ - private final ConcurrentLinkedHashMap> trees; + private final ConcurrentMap naturalRepairs; + + /** + * Map of column families to remote endpoints that need to rendezvous. The + * first endpoint to arrive at the rendezvous will store its tree in the + * appropriate slot of the TreePair object, and the second to arrive will + * remove the stored tree, and compare it. + * + * This map is only accessed from AE_SERVICE_STAGE, so it is not synchronized. + */ + private final Map> trees; public static AntiEntropyService instance() { - if ( aeService == null ) + if (aeService == null) { - synchronized ( AntiEntropyService.class ) + synchronized (AntiEntropyService.class) { - if ( aeService == null ) + if (aeService == null) { aeService = new AntiEntropyService(); } @@ -142,88 +142,95 @@ MessagingService.instance().registerVerbHandlers(TREE_REQUEST_VERB, new TreeRequestVerbHandler()); MessagingService.instance().registerVerbHandlers(TREE_RESPONSE_VERB, new TreeResponseVerbHandler()); - trees = ConcurrentLinkedHashMap.create(ConcurrentLinkedHashMap.EvictionPolicy.LRU, - DatabaseDescriptor.getReplicationFactor()+1); + naturalRepairs = new ConcurrentHashMap(); + trees = new HashMap>(); } /** - * @param endpoint Endpoint to fetch trees for. - * @return The store of trees for the given endpoint. + * Returns the map of waiting rendezvous endpoints to trees for the given cf. + * Should only be called within AE_SERVICE_STAGE. + * + * @param cf Column family to fetch trees for. + * @return The store of trees for the given cf. */ - private Cachetable cacheForEndpoint(InetAddress endpoint) + private Cachetable rendezvousPairs(CFPair cf) { - Cachetable etrees = trees.get(endpoint); - if (etrees == null) + Cachetable ctrees = trees.get(cf); + if (ctrees == null) { - // double check the creation - Cachetable probable = new Cachetable(TREE_CACHE_LIFETIME); - if ((etrees = trees.putIfAbsent(endpoint, probable)) == null) - { - // created new store for this endpoint - etrees = probable; - } + ctrees = new Cachetable(TREE_STORE_TIMEOUT); + trees.put(cf, ctrees); } - return etrees; + return ctrees; } /** - * Register a tree from the given endpoint to be compared to neighbor trees + * Return all of the neighbors with whom we share data. + */ + private static Collection getNeighbors() + { + InetAddress local = FBUtilities.getLocalAddress(); + StorageService ss = StorageService.instance(); + return Collections2.filter(ss.getNaturalEndpoints(ss.getLocalToken()), + Predicates.not(Predicates.equalTo(local))); + } + + /** + * Register a tree from the given endpoint to be compared to the appropriate trees * in AE_SERVICE_STAGE when they become available. * * @param cf The column family of the tree. * @param endpoint The endpoint which owns the given tree. * @param tree The tree for the endpoint. */ - void register(CFTuple cf, InetAddress endpoint, MerkleTree tree) + void rendezvous(CFPair cf, InetAddress endpoint, MerkleTree tree) { InetAddress LOCAL = FBUtilities.getLocalAddress(); - // store the tree, possibly replacing an older copy - Cachetable etrees = cacheForEndpoint(endpoint); + // return the rendezvous pairs for this cf + Cachetable ctrees = rendezvousPairs(cf); List differencers = new ArrayList(); if (LOCAL.equals(endpoint)) { - // we stored a local tree: queue differencing for all remote trees - for (Map.Entry> entry : trees.entrySet()) + // we're registering a local tree: rendezvous with all remote trees + for (InetAddress neighbor : getNeighbors()) { - if (LOCAL.equals(entry.getKey())) - { - // don't compare to ourself - continue; - } - MerkleTree remotetree = entry.getValue().remove(cf); - if (remotetree == null) + TreePair waiting = ctrees.remove(neighbor); + if (waiting != null && waiting.right != null) { - // no tree stored for this endpoint at the moment + // the neighbor beat us to the rendezvous: queue differencing + differencers.add(new Differencer(cf, LOCAL, neighbor, + tree, waiting.right)); continue; } - differencers.add(new Differencer(cf, LOCAL, entry.getKey(), tree, remotetree)); + // else, the local tree is first to the rendezvous: store and wait + ctrees.put(neighbor, new TreePair(tree, null)); + logger.debug("Stored local tree for " + cf + " to wait for " + neighbor); } - etrees.put(cf, tree); - logger.debug("Cached local tree for " + cf); } else { - // we stored a remote tree: queue differencing for local tree - MerkleTree localtree = cacheForEndpoint(LOCAL).get(cf); - if (localtree != null) - { - // compare immediately - differencers.add(new Differencer(cf, LOCAL, endpoint, localtree, tree)); + // we're registering a remote tree: rendezvous with the local tree + TreePair waiting = ctrees.remove(endpoint); + if (waiting != null && waiting.left != null) + { + // the local tree beat us to the rendezvous: queue differencing + differencers.add(new Differencer(cf, LOCAL, endpoint, + waiting.left, tree)); } else { - // cache for later comparison - etrees.put(cf, tree); - logger.debug("Cached remote tree from " + endpoint + " for " + cf); + // else, the remote tree is first to the rendezvous: store and wait + ctrees.put(endpoint, new TreePair(null, tree)); + logger.debug("Stored remote tree for " + cf + " from " + endpoint); } } for (Differencer differencer : differencers) { - logger.debug("Queueing comparison " + differencer); + logger.info("Queueing comparison " + differencer); StageManager.getStage(AE_SERVICE_STAGE).execute(differencer); } } @@ -258,12 +265,40 @@ * * @param table Table containing cf. * @param cf The column family. - * @param endpoint The endpoint that generated the tree. - * @return the cached tree for the given cf and endpoint. + * @param remote The remote endpoint for the rendezvous. + * @return The tree pair for the given rendezvous if it exists, else null. */ - MerkleTree getCachedTree(String table, String cf, InetAddress endpoint) + TreePair getRendezvousPair(String table, String cf, InetAddress remote) { - return cacheForEndpoint(endpoint).get(new CFTuple(table, cf)); + return rendezvousPairs(new CFPair(table, cf)).get(remote); + } + + /** + * Should only be used for testing. + */ + void clearNaturalRepairs() + { + naturalRepairs.clear(); + } + + /** + * @param cf The column family. + * @return True if enough time has elapsed since the beginning of the last natural repair. + */ + private boolean shouldRunNaturally(CFPair cf) + { + Long curtime = System.currentTimeMillis(); + Long pretime = naturalRepairs.putIfAbsent(cf, curtime); + if (pretime != null) + { + if (pretime < (curtime - NATURAL_REPAIR_FREQUENCY)) + // replace pretime with curtime, unless someone beat us to it + return naturalRepairs.replace(cf, pretime, curtime); + // need to wait longer + logger.debug("Skipping natural repair: last occurred " + (curtime - pretime) + "ms ago."); + return false; + } + return true; } /** @@ -273,18 +308,21 @@ * @param table The table name containing the column family. * @param cf The column family name. * @param initiator Endpoint that initially triggered this validation, or null if - * the validation will not see all of the data contained in the column family. + * the validation is occuring due to a natural major compaction. + * @param major True if the validator will see all of the data contained in the column family. * @return A Validator. */ - public IValidator getValidator(String table, String cf, InetAddress initiator) + public IValidator getValidator(String table, String cf, InetAddress initiator, boolean major) { - if (initiator == null || table.equals(Table.SYSTEM_TABLE)) + if (!major || table.equals(Table.SYSTEM_TABLE)) return new NoopValidator(); - else if (StorageService.instance().getTokenMetadata().sortedTokens().size() < 1) + if (StorageService.instance().getTokenMetadata().sortedTokens().size() < 1) // gossiper isn't started return new NoopValidator(); - else - return new Validator(new CFTuple(table, cf), initiator); + CFPair cfpair = new CFPair(table, cf); + if (initiator == null && !shouldRunNaturally(cfpair)) + return new NoopValidator(); + return new Validator(cfpair); } /** @@ -307,8 +345,7 @@ */ public static class Validator implements IValidator, Callable { - public final CFTuple cf; - public final InetAddress initiator; + public final CFPair cf; public final MerkleTree tree; // the minimum token sorts first, but falls into the last range @@ -322,20 +359,18 @@ public final static Predicate DKPRED = Predicates.alwaysTrue(); public final static MerkleTree.RowHash EMPTY_ROW = new MerkleTree.RowHash(null, new byte[0]); - Validator(CFTuple cf, InetAddress initiator) + Validator(CFPair cf) { this(cf, - initiator, // TODO: memory usage (maxsize) should either be tunable per // CF, globally, or as shared for all CFs in a cluster new MerkleTree(DatabaseDescriptor.getPartitioner(), MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15))); } - Validator(CFTuple cf, InetAddress initiator, MerkleTree tree) + Validator(CFPair cf, MerkleTree tree) { - assert cf != null && initiator != null && tree != null; + assert cf != null && tree != null; this.cf = cf; - this.initiator = initiator; this.tree = tree; minrows = new ArrayList(); mintoken = null; @@ -350,7 +385,7 @@ { public boolean apply(SSTable ss) { - return cf.table.equals(ss.getTableName()) && cf.cf.equals(ss.getColumnFamilyName()); + return cf.left.equals(ss.getTableName()) && cf.right.equals(ss.getColumnFamilyName()); } }; List keys = SSTableReader.getIndexedDecoratedKeysFor(cfpred, DKPRED); @@ -435,8 +470,7 @@ } /** - * Depending on the initiator for the validation, either registers - * trees to be compared locally in AE_SERVICE_STAGE, or remotely. + * Registers the newly created tree for rendezvous in AE_SERVICE_STAGE. */ public void complete() { @@ -459,9 +493,8 @@ } /** - * Called after the valdation lifecycle to trigger additional action - * with the now valid tree. Runs in AE_SERVICE_STAGE: depending on - * which node initiated validation, performs different actions. + * Called after the validation lifecycle to trigger additional action + * with the now valid tree. Runs in AE_SERVICE_STAGE. * * @return A meaningless object. */ @@ -469,13 +502,11 @@ { AntiEntropyService aes = AntiEntropyService.instance(); InetAddress local = FBUtilities.getLocalAddress(); - StorageService ss = StorageService.instance(); - Collection neighbors = Collections2.filter(ss.getNaturalEndpoints(ss.getLocalToken()), - Predicates.not(Predicates.equalTo(local))); + Collection neighbors = getNeighbors(); - // cache the local tree and then broadcast it to our neighbors - aes.register(cf, local, tree); + // store the local tree and then broadcast it to our neighbors + aes.rendezvous(cf, local, tree); aes.notifyNeighbors(this, local, neighbors); // return any old object @@ -519,14 +550,14 @@ */ public static class Differencer implements Runnable { - public final CFTuple cf; + public final CFPair cf; public final InetAddress local; public final InetAddress remote; public final MerkleTree ltree; public final MerkleTree rtree; public final List differences; - public Differencer(CFTuple cf, InetAddress local, InetAddress remote, MerkleTree ltree, MerkleTree rtree) + public Differencer(CFPair cf, InetAddress local, InetAddress remote, MerkleTree ltree, MerkleTree rtree) { this.cf = cf; this.local = local; @@ -617,12 +648,12 @@ void performStreamingRepair() throws IOException { logger.info("Performing streaming repair of " + differences.size() + " ranges to " + remote + " for " + cf); - ColumnFamilyStore cfstore = Table.open(cf.table).getColumnFamilyStore(cf.cf); + ColumnFamilyStore cfstore = Table.open(cf.left).getColumnFamilyStore(cf.right); try { List ranges = new ArrayList(differences); List sstables = CompactionManager.instance.submitAnti(cfstore, ranges, remote).get(); - Streaming.transferSSTables(remote, sstables, cf.table); + Streaming.transferSSTables(remote, sstables, cf.left); } catch(Exception e) { @@ -639,11 +670,9 @@ /** * Handler for requests from remote nodes to generate a valid tree. - * - * The payload is an EndpointCF triple representing the columnfamily to validate - * and the initiating endpoint. + * The payload is a CFPair representing the columnfamily to validate. */ - public static class TreeRequestVerbHandler implements IVerbHandler, ICompactSerializer + public static class TreeRequestVerbHandler implements IVerbHandler, ICompactSerializer { public static final TreeRequestVerbHandler SERIALIZER = new TreeRequestVerbHandler(); static Message makeVerb(String table, String cf) @@ -652,7 +681,7 @@ { ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); - SERIALIZER.serialize(new CFTuple(table, cf), dos); + SERIALIZER.serialize(new CFPair(table, cf), dos); return new Message(FBUtilities.getLocalAddress(), AE_SERVICE_STAGE, TREE_REQUEST_VERB, bos.toByteArray()); } catch(IOException e) @@ -661,21 +690,19 @@ } } - public void serialize(CFTuple treerequest, DataOutputStream dos) throws IOException + public void serialize(CFPair treerequest, DataOutputStream dos) throws IOException { - dos.writeUTF(treerequest.table); - dos.writeUTF(treerequest.cf); + dos.writeUTF(treerequest.left); + dos.writeUTF(treerequest.right); } - public CFTuple deserialize(DataInputStream dis) throws IOException + public CFPair deserialize(DataInputStream dis) throws IOException { - return new CFTuple(dis.readUTF(), dis.readUTF()); + return new CFPair(dis.readUTF(), dis.readUTF()); } /** - * If we have a recently generated cached tree, respond with it immediately: - * Otherwise, trigger a readonly compaction which will broadcast the tree - * upon completion. + * Trigger a readonly compaction which will broadcast the tree upon completion. */ public void doVerb(Message message) { @@ -685,30 +712,13 @@ try { - CFTuple request = this.deserialize(buffer); - - // check for cached local tree - InetAddress local = FBUtilities.getLocalAddress(); - MerkleTree cached = AntiEntropyService.instance().getCachedTree(request.table, request.cf, local); - if (cached != null) - { - if (local.equals(message.getFrom())) - { - // we are the requestor, and we already have a cached tree - return; - } - // respond immediately with the recently generated tree - Validator valid = new Validator(request, message.getFrom(), cached); - Message response = TreeResponseVerbHandler.makeVerb(local, valid); - MessagingService.instance().sendOneWay(response, message.getFrom()); - logger.debug("Answered request from " + message.getFrom() + " for " + request + " with cached tree."); - return; - } + CFPair request = this.deserialize(buffer); // trigger readonly-compaction logger.debug("Queueing readonly compaction for request from " + message.getFrom() + " for " + request); - Table table = Table.open(request.table); - CompactionManager.instance.submitReadonly(table.getColumnFamilyStore(request.cf), message.getFrom()); + Table table = Table.open(request.left); + CompactionManager.instance.submitReadonly(table.getColumnFamilyStore(request.right), + message.getFrom()); } catch (IOException e) { @@ -718,8 +728,7 @@ } /** - * Handler for responses from remote nodes that contain a valid tree. - * + * Handler for responses from remote nodes which contain a valid tree. * The payload is a completed Validator object from the remote endpoint. */ public static class TreeResponseVerbHandler implements IVerbHandler, ICompactSerializer @@ -744,18 +753,17 @@ { TreeRequestVerbHandler.SERIALIZER.serialize(v.cf, dos); ObjectOutputStream oos = new ObjectOutputStream(dos); - oos.writeObject(v.initiator); oos.writeObject(v.tree); oos.flush(); } public Validator deserialize(DataInputStream dis) throws IOException { - final CFTuple cf = TreeRequestVerbHandler.SERIALIZER.deserialize(dis); + final CFPair cf = TreeRequestVerbHandler.SERIALIZER.deserialize(dis); ObjectInputStream ois = new ObjectInputStream(dis); try { - Validator v = new Validator(cf, (InetAddress)ois.readObject(), (MerkleTree)ois.readObject()); + Validator v = new Validator(cf, (MerkleTree)ois.readObject()); return v; } catch(Exception e) @@ -774,7 +782,7 @@ { // deserialize the remote tree, and register it Validator rvalidator = this.deserialize(buffer); - AntiEntropyService.instance().register(rvalidator.cf, message.getFrom(), rvalidator.tree); + AntiEntropyService.instance().rendezvous(rvalidator.cf, message.getFrom(), rvalidator.tree); } catch (IOException e) { @@ -785,39 +793,26 @@ /** * A tuple of table and cf. - * TODO: Use utils.Pair once it implements hashCode/equals. */ - static final class CFTuple + static final class CFPair extends Pair { - public final String table; - public final String cf; - public CFTuple(String table, String cf) + public CFPair(String table, String cf) { + super(table, cf); assert table != null && cf != null; - this.table = table; - this.cf = cf; } + } - @Override - public int hashCode() - { - int hashCode = 31 + table.hashCode(); - return 31*hashCode + cf.hashCode(); - } - - @Override - public boolean equals(Object o) - { - if(!(o instanceof CFTuple)) - return false; - CFTuple that = (CFTuple)o; - return table.equals(that.table) && cf.equals(that.cf); - } - - @Override - public String toString() + /** + * A tuple of a local and remote tree. One of the trees should be null, but + * not both. + */ + static final class TreePair extends Pair + { + public TreePair(MerkleTree local, MerkleTree remote) { - return "[" + table + "][" + cf + "]"; + super(local, remote); + assert local != null ^ remote != null; } } } Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java?rev=892450&r1=892449&r2=892450&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java Sat Dec 19 08:03:40 2009 @@ -18,7 +18,9 @@ package org.apache.cassandra.utils; -public final class Pair +import com.google.common.base.Objects; + +public class Pair { public final T1 left; public final T2 right; @@ -30,17 +32,22 @@ } @Override - public int hashCode() + public final int hashCode() { - throw new UnsupportedOperationException("todo"); + int hashCode = 31 + (left == null ? 0 : left.hashCode()); + return 31*hashCode + (right == null ? 0 : right.hashCode()); } - + @Override - public boolean equals(Object obj) + public final boolean equals(Object o) { - throw new UnsupportedOperationException("todo"); + if(!(o instanceof Pair)) + return false; + Pair that = (Pair)o; + // handles nulls properly + return Objects.equal(left, that.left) && Objects.equal(right, that.right); } - + @Override public String toString() { Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java?rev=892450&r1=892449&r2=892450&view=diff ============================================================================== --- incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java (original) +++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java Sat Dec 19 08:03:40 2009 @@ -28,4 +28,14 @@ { assertNotNull(DatabaseDescriptor.getConfigFileName(), "DatabaseDescriptor should always be able to return the file name of the config file"); } + + /** + * Allow modification of replicationFactor for testing purposes. + * TODO: A more general method of property modification would be useful, but + * will probably have to wait for a refactor away from all the static fields. + */ + public static void setReplicationFactor(int factor) + { + DatabaseDescriptor.setReplicationFactorUnsafe(factor); + } } Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=892450&r1=892449&r2=892450&view=diff ============================================================================== --- incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (original) +++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Sat Dec 19 08:03:40 2009 @@ -25,22 +25,19 @@ import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.filter.QueryPath; -import org.apache.cassandra.db.ColumnFamilyStoreUtils; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.CompactionManager; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.RowMutation; -import org.apache.cassandra.db.Table; +import org.apache.cassandra.db.*; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.CompactionIterator.CompactedRow; import org.apache.cassandra.io.DataOutputBuffer; +import org.apache.cassandra.locator.TokenMetadata; import static org.apache.cassandra.service.AntiEntropyService.*; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MerkleTree; import org.apache.cassandra.CleanupHelper; +import org.apache.cassandra.config.DatabaseDescriptorTest; import org.junit.Before; import org.junit.Test; @@ -52,14 +49,28 @@ // table and column family to test against public AntiEntropyService aes; - public String tablename; - public String cfname; + + public static String tablename; + public static String cfname; + public static InetAddress REMOTE; static { try { + // bump the replication factor so that local overlaps with REMOTE below + DatabaseDescriptorTest.setReplicationFactor(2); + StorageService.instance().initServer(); + // generate a fake endpoint for which we can spoof receiving/sending trees + TokenMetadata tmd = StorageService.instance().getTokenMetadata(); + IPartitioner part = StorageService.getPartitioner(); + REMOTE = InetAddress.getByName("127.0.0.2"); + tmd.updateNormalToken(part.getMinimumToken(), REMOTE); + assert tmd.isMember(REMOTE); + + tablename = DatabaseDescriptor.getTables().get(0); + cfname = Table.open(tablename).getColumnFamilies().iterator().next(); } catch(Exception e) { @@ -71,9 +82,6 @@ public void prepare() throws Exception { aes = AntiEntropyService.instance(); - - tablename = DatabaseDescriptor.getTables().get(0); - cfname = Table.open(tablename).getColumnFamilies().iterator().next(); } @Test @@ -82,7 +90,22 @@ assert null != aes; assert aes == AntiEntropyService.instance(); } - + + @Test + public void testGetValidator() throws Throwable + { + aes.clearNaturalRepairs(); + + // not major + assert aes.getValidator(tablename, cfname, null, false) instanceof NoopValidator; + // adds entry to naturalRepairs + assert aes.getValidator(tablename, cfname, null, true) instanceof Validator; + // blocked by entry in naturalRepairs + assert aes.getValidator(tablename, cfname, null, true) instanceof NoopValidator; + // triggered manually + assert aes.getValidator(tablename, cfname, REMOTE, true) instanceof Validator; + } + @Test public void testValidatorPrepare() throws Throwable { @@ -97,7 +120,7 @@ ColumnFamilyStoreUtils.writeColumnFamily(rms); // sample - validator = new Validator(new CFTuple(tablename, cfname), LOCAL); + validator = new Validator(new CFPair(tablename, cfname)); validator.prepare(); // and confirm that the tree was split @@ -107,7 +130,7 @@ @Test public void testValidatorComplete() throws Throwable { - Validator validator = new Validator(new CFTuple(tablename, cfname), LOCAL); + Validator validator = new Validator(new CFPair(tablename, cfname)); validator.prepare(); validator.complete(); @@ -122,8 +145,7 @@ @Test public void testValidatorAdd() throws Throwable { - Validator validator = new Validator(new CFTuple(tablename, cfname), - LOCAL); + Validator validator = new Validator(new CFPair(tablename, cfname)); IPartitioner part = validator.tree.partitioner(); Token min = part.getMinimumToken(); Token mid = part.midpoint(min, min); @@ -146,7 +168,7 @@ * Build a column family with 2 or more SSTables, and then force a major compaction */ @Test - public void testTreeCaching() throws Throwable + public void testTreeStore() throws Throwable { // populate column family List rms = new LinkedList(); @@ -157,20 +179,20 @@ ColumnFamilyStoreUtils.writeColumnFamily(rms); ColumnFamilyStore store = ColumnFamilyStoreUtils.writeColumnFamily(rms); - // force a major compaction, and wait for it to finish - MerkleTree old = aes.getCachedTree(tablename, cfname, LOCAL); - CompactionManager.instance.submitMajor(store, 0).get(5000, TimeUnit.MILLISECONDS); + TreePair old = aes.getRendezvousPair(tablename, cfname, REMOTE); + // force a readonly compaction, and wait for it to finish + CompactionManager.instance.submitReadonly(store, REMOTE).get(5000, TimeUnit.MILLISECONDS); - // check that a tree was created and cached + // check that a tree was created and stored flushAES().get(5000, TimeUnit.MILLISECONDS); - assert old != aes.getCachedTree(tablename, cfname, LOCAL); + assert old != aes.getRendezvousPair(tablename, cfname, REMOTE); } @Test public void testNotifyNeighbors() throws Throwable { // generate empty tree - Validator validator = new Validator(new CFTuple(tablename, cfname), LOCAL); + Validator validator = new Validator(new CFPair(tablename, cfname)); validator.prepare(); validator.complete(); @@ -183,21 +205,20 @@ // confirm that our reference is not equal to the original due // to (de)serialization - assert tree != aes.getCachedTree(tablename, cfname, LOCAL); + assert tree != aes.getRendezvousPair(tablename, cfname, REMOTE).left; } @Test public void testDifferencer() throws Throwable { // generate a tree - Validator validator = new Validator(new CFTuple("ltable", "lcf"), LOCAL); + Validator validator = new Validator(new CFPair("ltable", "lcf")); validator.prepare(); // create a clone with no values filled - validator.complete(); MerkleTree ltree = validator.tree; - validator = new Validator(new CFTuple("rtable", "rcf"), LOCAL); + validator = new Validator(new CFPair("rtable", "rcf")); validator.prepare(); validator.complete(); MerkleTree rtree = validator.tree; @@ -209,7 +230,7 @@ changed.hash("non-empty hash!".getBytes()); // difference the trees - Differencer diff = new Differencer(new CFTuple(tablename, cfname), + Differencer diff = new Differencer(new CFPair(tablename, cfname), LOCAL, LOCAL, ltree, rtree); diff.run();