cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eev...@apache.org
Subject [2/5] git commit: p/4443/040_tests
Date Fri, 14 Sep 2012 15:16:18 GMT
p/4443/040_tests

Relocation unit tests.

Patch by eevans; reviewed by Brandon Williams for CASSANDRA-4559


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d09b47c3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d09b47c3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d09b47c3

Branch: refs/heads/trunk
Commit: d09b47c3e7330c8a68201fabda88aca38ce2465d
Parents: a02bbf5
Author: Eric Evans <eevans@apache.org>
Authored: Fri Sep 14 10:09:10 2012 -0500
Committer: Eric Evans <eevans@apache.org>
Committed: Fri Sep 14 10:18:02 2012 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/service/RelocateTest.java |  191 +++++++++++++++
 1 files changed, 191 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d09b47c3/test/unit/org/apache/cassandra/service/RelocateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RelocateTest.java b/test/unit/org/apache/cassandra/service/RelocateTest.java
new file mode 100644
index 0000000..64604e0
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/RelocateTest.java
@@ -0,0 +1,191 @@
+package org.apache.cassandra.service;
+
+import static org.junit.Assert.*;
+
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.dht.BigIntegerToken;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.SimpleSnitch;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class RelocateTest
+{
+    private static final int TOKENS_PER_NODE = 256;
+    private static final int TOKEN_STEP = 10;
+    private static final IPartitioner<?> partitioner = new RandomPartitioner();
+    private static IPartitioner<?> oldPartitioner;
+    private static VersionedValue.VersionedValueFactory vvFactory;
+
+    private StorageService ss = StorageService.instance;
+    private TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+    @Before
+    public void init()
+    {
+        tmd.clearUnsafe();
+    }
+
+    @BeforeClass
+    public static void setUp() throws Exception
+    {
+        oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner);
+        SchemaLoader.loadSchema();
+        vvFactory = new VersionedValue.VersionedValueFactory(partitioner);
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception
+    {
+        StorageService.instance.setPartitionerUnsafe(oldPartitioner);
+        SchemaLoader.stopGossiper();
+    }
+
+    /** Setup a virtual node ring */
+    private static Map<Token<?>, InetAddress> createInitialRing(int size) throws
UnknownHostException
+    {
+        Map<Token<?>, InetAddress> tokenMap = new HashMap<Token<?>,
InetAddress>();
+        int currentToken = TOKEN_STEP;
+
+        for(int i = 0; i < size; i++)
+        {
+            InetAddress endpoint = InetAddress.getByName("127.0.0." + String.valueOf(i +
1));
+            Gossiper.instance.initializeNodeUnsafe(endpoint, UUID.randomUUID(), 1);
+            List<Token> tokens = new ArrayList<Token>();
+
+            for (int j = 0; j < TOKENS_PER_NODE; j++)
+            {
+                Token token = new BigIntegerToken(String.valueOf(currentToken));
+                tokenMap.put(token, endpoint);
+                tokens.add(token);
+                currentToken += TOKEN_STEP;
+            }
+
+            Gossiper.instance.injectApplicationState(endpoint, ApplicationState.TOKENS, vvFactory.tokens(tokens));
+            StorageService.instance.onChange(endpoint, ApplicationState.STATUS, vvFactory.normal(tokens));
+        }
+
+        return tokenMap;
+    }
+
+    // Copy-pasta from MoveTest.java
+    private AbstractReplicationStrategy getStrategy(String table, TokenMetadata tmd) throws
ConfigurationException
+    {
+        KSMetaData ksmd = Schema.instance.getKSMetaData(table);
+        return AbstractReplicationStrategy.createReplicationStrategy(
+                table,
+                ksmd.strategyClass,
+                tmd,
+                new SimpleSnitch(),
+                ksmd.strategyOptions);
+    }
+
+    /** Ensure proper write endpoints during relocation */
+    @Test
+    public void testWriteEndpointsDuringRelocate() throws Exception
+    {
+        Map<Token<?>, InetAddress> tokenMap = createInitialRing(5);
+        Map<Token, List<InetAddress>> expectedEndpoints = new HashMap<Token,
List<InetAddress>>();
+
+
+        for (Token<?> token : tokenMap.keySet())
+        {
+            BigIntegerToken keyToken = new BigIntegerToken(((BigInteger)token.token).add(new
BigInteger("5")));
+            List<InetAddress> endpoints = new ArrayList<InetAddress>();
+            Iterator<Token> tokenIter = TokenMetadata.ringIterator(tmd.sortedTokens(),
keyToken, false);
+            while (tokenIter.hasNext())
+            {
+                InetAddress ep = tmd.getEndpoint(tokenIter.next());
+                if (!endpoints.contains(ep))
+                    endpoints.add(ep);
+            }
+            expectedEndpoints.put(keyToken, endpoints);
+        }
+
+        // Relocate the first token from the first endpoint, to the second endpoint.
+        Token relocateToken = new BigIntegerToken(String.valueOf(TOKEN_STEP));
+        ss.onChange(
+                InetAddress.getByName("127.0.0.2"),
+                ApplicationState.STATUS,
+                vvFactory.relocating(Collections.singleton(relocateToken)));
+        assertTrue(tmd.isRelocating(relocateToken));
+
+        AbstractReplicationStrategy strategy;
+        for (String table : Schema.instance.getNonSystemTables())
+        {
+            strategy = getStrategy(table, tmd);
+            for (Token token : tokenMap.keySet())
+            {
+                BigIntegerToken keyToken = new BigIntegerToken(((BigInteger)token.token).add(new
BigInteger("5")));
+
+                HashSet<InetAddress> actual = new HashSet<InetAddress>(tmd.getWriteEndpoints(keyToken,
table, strategy.calculateNaturalEndpoints(keyToken, tmd.cloneOnlyTokenMap())));
+                HashSet<InetAddress> expected = new HashSet<InetAddress>();
+
+                for (int i = 0; i < actual.size(); i++)
+                    expected.add(expectedEndpoints.get(keyToken).get(i));
+
+                assertEquals("mismatched endpoint sets", expected, actual);
+            }
+        }
+    }
+
+    /** Use STATUS changes to trigger membership update and validate results. */
+    @Test
+    public void testRelocationSuccess() throws UnknownHostException
+    {
+        createInitialRing(5);
+
+        // Node handling the relocation (dst), and the token being relocated (src).
+        InetAddress relocator = InetAddress.getByName("127.0.0.3");
+        Token relocatee = new BigIntegerToken(String.valueOf(TOKEN_STEP));
+
+        // Send RELOCATING and ensure token status
+        ss.onChange(relocator, ApplicationState.STATUS, vvFactory.relocating(Collections.singleton(relocatee)));
+        assertTrue(tmd.isRelocating(relocatee));
+
+        // Create a list of the endpoint's existing tokens, and add the relocatee to it.
+        List<Token> tokens = new ArrayList<Token>(tmd.getTokens(relocator));
+        tokens.add(relocatee);
+
+        // Send a normal status, then ensure all is copesetic.
+        Gossiper.instance.injectApplicationState(relocator, ApplicationState.TOKENS, vvFactory.tokens(tokens));
+        ss.onChange(relocator, ApplicationState.STATUS, vvFactory.normal(tokens));
+
+        // Relocating entries are removed after RING_DELAY
+        try
+        {
+            Thread.sleep(StorageService.RING_DELAY + 10);
+        }
+        catch (InterruptedException e)
+        {
+            System.err.println("ACHTUNG! Interrupted; testRelocationSuccess() will almost
certainly fail!");
+        }
+
+        assertTrue(!tmd.isRelocating(relocatee));
+        assertEquals(tmd.getEndpoint(relocatee), relocator);
+    }
+}


Mime
View raw message