cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [1/5] cassandra git commit: Rewrite hinted handoff
Date Wed, 19 Aug 2015 14:29:48 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk b83b674ee -> cabdb03a9


http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/hints/HintMessageTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintMessageTest.java b/test/unit/org/apache/cassandra/hints/HintMessageTest.java
new file mode 100644
index 0000000..7ffaa54
--- /dev/null
+++ b/test/unit/org/apache/cassandra/hints/HintMessageTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static junit.framework.Assert.assertEquals;
+
+import static org.apache.cassandra.hints.HintsTestUtil.assertHintsEqual;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public class HintMessageTest
+{
+    private static final String KEYSPACE = "hint_message_test";
+    private static final String TABLE = "table";
+
+    @Test
+    public void testSerializer() throws IOException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), SchemaLoader.standardCFMD(KEYSPACE,
TABLE));
+
+        UUID hostId = UUID.randomUUID();
+        long now = FBUtilities.timestampMicros();
+
+        CFMetaData table = Schema.instance.getCFMetaData(KEYSPACE, TABLE);
+        Mutation mutation =
+            new RowUpdateBuilder(table, now, bytes("key"))
+                .clustering("column")
+                .add("val", "val" + 1234)
+                .build();
+        Hint hint = Hint.create(mutation, now / 1000);
+        HintMessage message = new HintMessage(hostId, hint);
+
+        // serialize
+        int serializedSize = (int) HintMessage.serializer.serializedSize(message, MessagingService.current_version);
+        DataOutputBuffer dob = new DataOutputBuffer();
+        HintMessage.serializer.serialize(message, dob, MessagingService.current_version);
+        assertEquals(serializedSize, dob.getLength());
+
+        // deserialize
+        DataInputPlus di = new DataInputBuffer(dob.buffer(), true);
+        HintMessage deserializedMessage = HintMessage.serializer.deserialize(di, MessagingService.current_version);
+
+        // compare before/after
+        assertEquals(hostId, deserializedMessage.hostId);
+        assertHintsEqual(message.hint, deserializedMessage.hint);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/hints/HintTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintTest.java b/test/unit/org/apache/cassandra/hints/HintTest.java
new file mode 100644
index 0000000..c198149
--- /dev/null
+++ b/test/unit/org/apache/cassandra/hints/HintTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.IOException;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.TableParams;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static junit.framework.Assert.*;
+
+import static org.apache.cassandra.Util.dk;
+import static org.apache.cassandra.hints.HintsTestUtil.assertHintsEqual;
+import static org.apache.cassandra.hints.HintsTestUtil.assertPartitionsEqual;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public class HintTest
+{
+    private static final String KEYSPACE = "hint_test";
+    private static final String TABLE0 = "table_0";
+    private static final String TABLE1 = "table_1";
+    private static final String TABLE2 = "table_2";
+
+    @BeforeClass
+    public static void defineSchema()
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, TABLE0),
+                                    SchemaLoader.standardCFMD(KEYSPACE, TABLE1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, TABLE2));
+    }
+
+    @Before
+    public void resetGcGraceSeconds()
+    {
+        for (CFMetaData table : Schema.instance.getTables(KEYSPACE))
+            table.gcGraceSeconds(TableParams.DEFAULT_GC_GRACE_SECONDS);
+    }
+
+    @Test
+    public void testSerializer() throws IOException
+    {
+        long now = FBUtilities.timestampMicros();
+        Mutation mutation = createMutation("testSerializer", now);
+        Hint hint = Hint.create(mutation, now / 1000);
+
+        // serialize
+        int serializedSize = (int) Hint.serializer.serializedSize(hint, MessagingService.current_version);
+        DataOutputBuffer dob = new DataOutputBuffer();
+        Hint.serializer.serialize(hint, dob, MessagingService.current_version);
+        assertEquals(serializedSize, dob.getLength());
+
+        // deserialize
+        DataInputPlus di = new DataInputBuffer(dob.buffer(), true);
+        Hint deserializedHint = Hint.serializer.deserialize(di, MessagingService.current_version);
+
+        // compare before/after
+        assertHintsEqual(hint, deserializedHint);
+    }
+
+    @Test
+    public void testApply()
+    {
+        long now = FBUtilities.timestampMicros();
+        String key = "testApply";
+        Mutation mutation = createMutation(key, now);
+        Hint hint = Hint.create(mutation, now / 1000);
+
+        // sanity check that there is no data inside yet
+        assertNoPartitions(key, TABLE0);
+        assertNoPartitions(key, TABLE1);
+        assertNoPartitions(key, TABLE2);
+
+        hint.apply();
+
+        // assert that we can read the inserted partitions
+        for (PartitionUpdate partition : mutation.getPartitionUpdates())
+            assertPartitionsEqual(partition, readPartition(key, partition.metadata().cfName));
+    }
+
+    @Test
+    public void testApplyWithTruncation()
+    {
+        long now = FBUtilities.timestampMicros();
+        String key = "testApplyWithTruncation";
+        Mutation mutation = createMutation(key, now);
+
+        // sanity check that there is no data inside yet
+        assertNoPartitions(key, TABLE0);
+        assertNoPartitions(key, TABLE1);
+        assertNoPartitions(key, TABLE2);
+
+        // truncate TABLE1
+        Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE1).truncateBlocking();
+
+        // create and apply a hint with creation time in the past (one second before the
truncation)
+        Hint.create(mutation, now / 1000 - 1).apply();
+
+        // TABLE1 update should have been skipped and not applied, as expired
+        assertNoPartitions(key, TABLE1);
+
+        // TABLE0 and TABLE2 updates should have been applied successfully
+        assertPartitionsEqual(mutation.getPartitionUpdate(Schema.instance.getId(KEYSPACE,
TABLE0)), readPartition(key, TABLE0));
+        assertPartitionsEqual(mutation.getPartitionUpdate(Schema.instance.getId(KEYSPACE,
TABLE2)), readPartition(key, TABLE2));
+    }
+
+    @Test
+    public void testApplyWithRegularExpiration()
+    {
+        long now = FBUtilities.timestampMicros();
+        String key = "testApplyWithRegularExpiration";
+        Mutation mutation = createMutation(key, now);
+
+        // sanity check that there is no data inside yet
+        assertNoPartitions(key, TABLE0);
+        assertNoPartitions(key, TABLE1);
+        assertNoPartitions(key, TABLE2);
+
+        // lower the GC GS on TABLE0 to 0 BEFORE the hint is created
+        Schema.instance.getCFMetaData(KEYSPACE, TABLE0).gcGraceSeconds(0);
+
+        Hint.create(mutation, now / 1000).apply();
+
+        // all updates should have been skipped and not applied, as expired
+        assertNoPartitions(key, TABLE0);
+        assertNoPartitions(key, TABLE1);
+        assertNoPartitions(key, TABLE2);
+    }
+
+    @Test
+    public void testApplyWithGCGSReducedLater()
+    {
+        long now = FBUtilities.timestampMicros();
+        String key = "testApplyWithGCGSReducedLater";
+        Mutation mutation = createMutation(key, now);
+        Hint hint = Hint.create(mutation, now / 1000);
+
+        // sanity check that there is no data inside yet
+        assertNoPartitions(key, TABLE0);
+        assertNoPartitions(key, TABLE1);
+        assertNoPartitions(key, TABLE2);
+
+        // lower the GC GS on TABLE0 AFTER the hint is already created
+        Schema.instance.getCFMetaData(KEYSPACE, TABLE0).gcGraceSeconds(0);
+
+        hint.apply();
+
+        // all updates should have been skipped and not applied, as expired
+        assertNoPartitions(key, TABLE0);
+        assertNoPartitions(key, TABLE1);
+        assertNoPartitions(key, TABLE2);
+    }
+
+    private static Mutation createMutation(String key, long now)
+    {
+        Mutation mutation = new Mutation(KEYSPACE, dk(key));
+
+        new RowUpdateBuilder(Schema.instance.getCFMetaData(KEYSPACE, TABLE0), now, mutation)
+            .clustering("column0")
+            .add("val", "value0")
+            .build();
+
+        new RowUpdateBuilder(Schema.instance.getCFMetaData(KEYSPACE, TABLE1), now + 1, mutation)
+            .clustering("column1")
+            .add("val", "value1")
+            .build();
+
+        new RowUpdateBuilder(Schema.instance.getCFMetaData(KEYSPACE, TABLE2), now + 2, mutation)
+            .clustering("column2")
+            .add("val", "value2")
+            .build();
+
+        return mutation;
+    }
+
+    private static SinglePartitionReadCommand cmd(String key, String table)
+    {
+        CFMetaData meta = Schema.instance.getCFMetaData(KEYSPACE, table);
+        return SinglePartitionReadCommand.fullPartitionRead(meta, FBUtilities.nowInSeconds(),
bytes(key));
+    }
+
+    private static FilteredPartition readPartition(String key, String table)
+    {
+        return Util.getOnlyPartition(cmd(key, table));
+    }
+
+    private static void assertNoPartitions(String key, String table)
+    {
+        ReadCommand cmd = cmd(key, table);
+
+        try (ReadOrderGroup orderGroup = cmd.startOrderGroup();
+             PartitionIterator iterator = cmd.executeInternal(orderGroup))
+        {
+            assertFalse(iterator.hasNext());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsBufferTest.java b/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
new file mode 100644
index 0000000..ebc333a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.CRC32;
+
+import com.google.common.collect.Iterables;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+
+import static junit.framework.Assert.*;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.apache.cassandra.utils.FBUtilities.updateChecksum;
+
+public class HintsBufferTest
+{
+    private static final String KEYSPACE = "hints_buffer_test";
+    private static final String TABLE = "table";
+
+    private static final int HINTS_COUNT = 300_000;
+    private static final int HINT_THREADS_COUNT = 10;
+    private static final int HOST_ID_COUNT = 10;
+
+    @BeforeClass
+    public static void defineSchema()
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), SchemaLoader.standardCFMD(KEYSPACE,
TABLE));
+    }
+
+    @Test
+    @SuppressWarnings("resource")
+    public void testOverlyLargeAllocation()
+    {
+        // create a small, 128 bytes buffer
+        HintsBuffer buffer = HintsBuffer.create(128);
+
+        // try allocating an entry of 65 bytes (53 bytes hint + 12 bytes of overhead)
+        try
+        {
+            buffer.allocate(65 - HintsBuffer.ENTRY_OVERHEAD_SIZE);
+            fail("Allocation of the buffer should have failed but hasn't");
+        }
+        catch (IllegalArgumentException e)
+        {
+            assertEquals(String.format("Hint of %s bytes is too large - the maximum size
is 64", 65 - HintsBuffer.ENTRY_OVERHEAD_SIZE),
+                         e.getMessage());
+        }
+
+        // assert that a 1-byte smaller allocation fits properly
+        try (HintsBuffer.Allocation allocation = buffer.allocate(64 - HintsBuffer.ENTRY_OVERHEAD_SIZE))
+        {
+            assertNotNull(allocation);
+        }
+    }
+
+    @Test
+    public void testWrite() throws IOException, InterruptedException
+    {
+        // generate 10 random host ids to choose from
+        UUID[] hostIds = new UUID[HOST_ID_COUNT];
+        for (int i = 0; i < hostIds.length; i++)
+            hostIds[i] = UUID.randomUUID();
+
+        // map each index to one random UUID from the previously created UUID array
+        Random random = new Random(System.currentTimeMillis());
+        UUID[] load = new UUID[HINTS_COUNT];
+        for (int i = 0; i < load.length; i++)
+            load[i] = hostIds[random.nextInt(HOST_ID_COUNT)];
+
+        // calculate the size of a single hint (they will all have an equal size in this
test)
+        int hintSize = (int) Hint.serializer.serializedSize(createHint(0, System.currentTimeMillis()),
MessagingService.current_version);
+        int entrySize = hintSize + HintsBuffer.ENTRY_OVERHEAD_SIZE;
+
+        // allocate a slab to fit *precisely* HINTS_COUNT hints
+        int slabSize = entrySize * HINTS_COUNT;
+        HintsBuffer buffer = HintsBuffer.create(slabSize);
+
+        // use a fixed timestamp base for all mutation timestamps
+        long baseTimestamp = System.currentTimeMillis();
+
+        // create HINT_THREADS_COUNT, start them, and wait for them to finish
+        List<Thread> threads = new ArrayList<>(HINT_THREADS_COUNT);
+        for (int i = 0; i < HINT_THREADS_COUNT; i ++)
+            threads.add(new Thread(new Writer(buffer, load, hintSize, i, baseTimestamp)));
+        threads.forEach(java.lang.Thread::start);
+        for (Thread thread : threads)
+            thread.join();
+
+        // sanity check that we are full
+        assertEquals(slabSize, buffer.capacity());
+        assertEquals(0, buffer.remaining());
+
+        // try to allocate more bytes, ensure that the allocation fails
+        assertNull(buffer.allocate(1));
+
+        // a failed allocation should automatically close the oporder
+        buffer.waitForModifications();
+
+        // a failed allocation should also automatically make the buffer as closed
+        assertTrue(buffer.isClosed());
+
+        // assert that host id set in the buffer equals to hostIds
+        assertEquals(HOST_ID_COUNT, buffer.hostIds().size());
+        assertEquals(new HashSet<>(Arrays.asList(hostIds)), buffer.hostIds());
+
+        // iterate over *every written hint*, validate its content
+        for (UUID hostId : hostIds)
+        {
+            Iterator<ByteBuffer> iter = buffer.consumingHintsIterator(hostId);
+            while (iter.hasNext())
+            {
+                int idx = validateEntry(hostId, iter.next(), baseTimestamp, load);
+                load[idx] = null; // nullify each visited entry
+            }
+        }
+
+        // assert that all the entries in load array have been visited and nullified
+        for (UUID hostId : load)
+            assertNull(hostId);
+
+        // free the buffer
+        buffer.free();
+    }
+
+    private static int validateEntry(UUID hostId, ByteBuffer buffer, long baseTimestamp,
UUID[] load) throws IOException
+    {
+        CRC32 crc = new CRC32();
+        DataInputPlus di = new DataInputBuffer(buffer, true);
+
+        // read and validate size
+        int hintSize = di.readInt();
+        assertEquals(hintSize + HintsBuffer.ENTRY_OVERHEAD_SIZE, buffer.remaining());
+
+        // read and validate size crc
+        updateChecksum(crc, buffer, buffer.position(), 4);
+        assertEquals((int) crc.getValue(), di.readInt());
+
+        // read the hint and update/validate overall crc
+        Hint hint = Hint.serializer.deserialize(di, MessagingService.current_version);
+        updateChecksum(crc, buffer, buffer.position() + 8, hintSize);
+        assertEquals((int) crc.getValue(), di.readInt());
+
+        // further validate hint correctness
+        int idx = (int) (hint.creationTime - baseTimestamp);
+        assertEquals(hostId, load[idx]);
+
+        Row row = hint.mutation.getPartitionUpdates().iterator().next().iterator().next();
+        assertEquals(1, Iterables.size(row.cells()));
+
+        assertEquals(bytes(idx), row.clustering().get(0));
+        Cell cell = row.cells().iterator().next();
+        assertEquals(TimeUnit.MILLISECONDS.toMicros(baseTimestamp + idx), cell.timestamp());
+        assertEquals(bytes(idx), cell.value());
+
+        return idx;
+    }
+
+    private static Hint createHint(int idx, long baseTimestamp)
+    {
+        long timestamp = baseTimestamp + idx;
+        return Hint.create(createMutation(idx, TimeUnit.MILLISECONDS.toMicros(timestamp)),
timestamp);
+    }
+
+    private static Mutation createMutation(int index, long timestamp)
+    {
+        CFMetaData table = Schema.instance.getCFMetaData(KEYSPACE, TABLE);
+        return new RowUpdateBuilder(table, timestamp, bytes(index))
+                   .clustering(bytes(index))
+                   .add("val", bytes(index))
+                   .build();
+    }
+
+    static class Writer implements Runnable
+    {
+        final HintsBuffer buffer;
+        final UUID[] load;
+        final int hintSize;
+        final int index;
+        final long baseTimestamp;
+
+        Writer(HintsBuffer buffer, UUID[] load, int hintSize, int index, long baseTimestamp)
+        {
+            this.buffer = buffer;
+            this.load = load;
+            this.hintSize = hintSize;
+            this.index = index;
+            this.baseTimestamp = baseTimestamp;
+        }
+
+        public void run()
+        {
+            int hintsPerThread = HINTS_COUNT / HINT_THREADS_COUNT;
+            for (int i = index * hintsPerThread; i < (index + 1) * hintsPerThread; i++)
+            {
+                try (HintsBuffer.Allocation allocation = buffer.allocate(hintSize))
+                {
+                    Hint hint = createHint(i, baseTimestamp);
+                    allocation.write(Collections.singleton(load[i]), hint);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
new file mode 100644
index 0000000..d627fcf
--- /dev/null
+++ b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.*;
+
+import org.junit.Test;
+
+import static junit.framework.Assert.*;
+
+public class HintsCatalogTest
+{
+    @Test
+    public void loadCompletenessAndOrderTest() throws IOException
+    {
+        File directory = Files.createTempDirectory(null).toFile();
+        try
+        {
+            loadCompletenessAndOrderTest(directory);
+        }
+        finally
+        {
+            directory.deleteOnExit();
+        }
+    }
+
+    public static void loadCompletenessAndOrderTest(File directory) throws IOException
+    {
+        UUID hostId1 = UUID.randomUUID();
+        UUID hostId2 = UUID.randomUUID();
+
+        long timestamp1 = System.currentTimeMillis();
+        long timestamp2 = System.currentTimeMillis() + 1;
+        long timestamp3 = System.currentTimeMillis() + 2;
+        long timestamp4 = System.currentTimeMillis() + 3;
+
+        HintsDescriptor descriptor1 = new HintsDescriptor(hostId1, timestamp1);
+        HintsDescriptor descriptor2 = new HintsDescriptor(hostId2, timestamp3);
+        HintsDescriptor descriptor3 = new HintsDescriptor(hostId2, timestamp2);
+        HintsDescriptor descriptor4 = new HintsDescriptor(hostId1, timestamp4);
+
+        writeDescriptor(directory, descriptor1);
+        writeDescriptor(directory, descriptor2);
+        writeDescriptor(directory, descriptor3);
+        writeDescriptor(directory, descriptor4);
+
+        HintsCatalog catalog = HintsCatalog.load(directory);
+        assertEquals(2, catalog.stores().count());
+
+        HintsStore store1 = catalog.get(hostId1);
+        assertNotNull(store1);
+        assertEquals(descriptor1, store1.poll());
+        assertEquals(descriptor4, store1.poll());
+        assertNull(store1.poll());
+
+        HintsStore store2 = catalog.get(hostId2);
+        assertNotNull(store2);
+        assertEquals(descriptor3, store2.poll());
+        assertEquals(descriptor2, store2.poll());
+        assertNull(store2.poll());
+    }
+
+    @SuppressWarnings("EmptyTryBlock")
+    private static void writeDescriptor(File directory, HintsDescriptor descriptor) throws
IOException
+    {
+        try (HintsWriter ignored = HintsWriter.create(directory, descriptor))
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/hints/HintsDescriptorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsDescriptorTest.java b/test/unit/org/apache/cassandra/hints/HintsDescriptorTest.java
new file mode 100644
index 0000000..08487d1
--- /dev/null
+++ b/test/unit/org/apache/cassandra/hints/HintsDescriptorTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.DataInput;
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import org.junit.Test;
+
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotSame;
+import static junit.framework.Assert.fail;
+
+public class HintsDescriptorTest
+{
+    @Test
+    public void testSerializerNormal() throws IOException
+    {
+        UUID hostId = UUID.randomUUID();
+        int version = HintsDescriptor.CURRENT_VERSION;
+        long timestamp = System.currentTimeMillis();
+        ImmutableMap<String, Object> parameters =
+                ImmutableMap.of("compression", (Object) ImmutableMap.of("class_name", LZ4Compressor.class.getName()));
+        HintsDescriptor descriptor = new HintsDescriptor(hostId, version, timestamp, parameters);
+
+        testSerializeDeserializeLoop(descriptor);
+    }
+
+    @Test
+    public void testSerializerWithEmptyParameters() throws IOException
+    {
+        UUID hostId = UUID.randomUUID();
+        int version = HintsDescriptor.CURRENT_VERSION;
+        long timestamp = System.currentTimeMillis();
+        ImmutableMap<String, Object> parameters = ImmutableMap.of();
+        HintsDescriptor descriptor = new HintsDescriptor(hostId, version, timestamp, parameters);
+
+        testSerializeDeserializeLoop(descriptor);
+    }
+
+    @Test
+    public void testCorruptedDeserialize() throws IOException
+    {
+        UUID hostId = UUID.randomUUID();
+        int version = HintsDescriptor.CURRENT_VERSION;
+        long timestamp = System.currentTimeMillis();
+        ImmutableMap<String, Object> parameters = ImmutableMap.of();
+        HintsDescriptor descriptor = new HintsDescriptor(hostId, version, timestamp, parameters);
+
+        byte[] bytes = serializeDescriptor(descriptor);
+
+        // mess up the parameters size
+        bytes[28] = (byte) 0xFF;
+        bytes[29] = (byte) 0xFF;
+        bytes[30] = (byte) 0xFF;
+        bytes[31] = (byte) 0x7F;
+
+        // attempt to deserialize
+        try
+        {
+            deserializeDescriptor(bytes);
+            fail("Deserializing the descriptor should but didn't");
+        }
+        catch (IOException e)
+        {
+            assertEquals("Hints Descriptor CRC Mismatch", e.getMessage());
+        }
+    }
+
+    @Test
+    @SuppressWarnings("EmptyTryBlock")
+    public void testReadFromFile() throws IOException
+    {
+        UUID hostId = UUID.randomUUID();
+        int version = HintsDescriptor.CURRENT_VERSION;
+        long timestamp = System.currentTimeMillis();
+        ImmutableMap<String, Object> parameters = ImmutableMap.of();
+        HintsDescriptor expected = new HintsDescriptor(hostId, version, timestamp, parameters);
+
+        File directory = Files.createTempDir();
+        try
+        {
+            try (HintsWriter ignored = HintsWriter.create(directory, expected))
+            {
+            }
+            HintsDescriptor actual = HintsDescriptor.readFromFile(new File(directory, expected.fileName()).toPath());
+            assertEquals(expected, actual);
+        }
+        finally
+        {
+            directory.deleteOnExit();
+        }
+    }
+
+    private static void testSerializeDeserializeLoop(HintsDescriptor descriptor) throws IOException
+    {
+        // serialize to a byte array
+        byte[] bytes = serializeDescriptor(descriptor);
+        // make sure the sizes match
+        assertEquals(bytes.length, descriptor.serializedSize());
+        // deserialize back
+        HintsDescriptor deserializedDescriptor = deserializeDescriptor(bytes);
+        // compare equality
+        assertDescriptorsEqual(descriptor, deserializedDescriptor);
+    }
+
+    private static byte[] serializeDescriptor(HintsDescriptor descriptor) throws IOException
+    {
+        DataOutputBuffer dob = new DataOutputBuffer();
+        descriptor.serialize(dob);
+        return dob.toByteArray();
+    }
+
+    private static HintsDescriptor deserializeDescriptor(byte[] bytes) throws IOException
+    {
+        DataInput in = ByteStreams.newDataInput(bytes);
+        return HintsDescriptor.deserialize(in);
+    }
+
+    private static void assertDescriptorsEqual(HintsDescriptor expected, HintsDescriptor
actual)
+    {
+        assertNotSame(expected, actual);
+        assertEquals(expected, actual);
+        assertEquals(expected.hashCode(), actual.hashCode());
+        assertEquals(expected.hostId, actual.hostId);
+        assertEquals(expected.version, actual.version);
+        assertEquals(expected.timestamp, actual.timestamp);
+        assertEquals(expected.parameters, actual.parameters);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/hints/HintsTestUtil.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsTestUtil.java b/test/unit/org/apache/cassandra/hints/HintsTestUtil.java
new file mode 100644
index 0000000..89b532f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/hints/HintsTestUtil.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.util.UUID;
+
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.partitions.AbstractBTreePartition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
+final class HintsTestUtil
+{
+    static void assertMutationsEqual(Mutation expected, Mutation actual)
+    {
+        assertEquals(expected.key(), actual.key());
+        assertEquals(expected.getPartitionUpdates().size(), actual.getPartitionUpdates().size());
+
+        for (UUID id : expected.getColumnFamilyIds())
+            assertPartitionsEqual(expected.getPartitionUpdate(id), actual.getPartitionUpdate(id));
+    }
+
+    static void assertPartitionsEqual(AbstractBTreePartition expected, AbstractBTreePartition
actual)
+    {
+        assertEquals(expected.partitionKey(), actual.partitionKey());
+        assertEquals(expected.deletionInfo(), actual.deletionInfo());
+        assertEquals(expected.columns(), actual.columns());
+        assertTrue(Iterators.elementsEqual(expected.iterator(), actual.iterator()));
+    }
+
+    static void assertHintsEqual(Hint expected, Hint actual)
+    {
+        assertEquals(expected.mutation.getKeyspaceName(), actual.mutation.getKeyspaceName());
+        assertEquals(expected.mutation.key(), actual.mutation.key());
+        assertEquals(expected.mutation.getColumnFamilyIds(), actual.mutation.getColumnFamilyIds());
+        for (PartitionUpdate partitionUpdate : expected.mutation.getPartitionUpdates())
+            assertPartitionsEqual(partitionUpdate, actual.mutation.getPartitionUpdate(partitionUpdate.metadata().cfId));
+        assertEquals(expected.creationTime, actual.creationTime);
+        assertEquals(expected.gcgs, actual.gcgs);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java b/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java
new file mode 100644
index 0000000..85e4b69
--- /dev/null
+++ b/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.*;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+
+import static org.apache.cassandra.hints.HintsTestUtil.assertMutationsEqual;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+// TODO: test split into several files
+@SuppressWarnings("deprecation")
+public class LegacyHintsMigratorTest
+{
+    private static final String KEYSPACE = "legacy_hints_migrator_test";
+    private static final String TABLE = "table";
+
+    @BeforeClass
+    public static void defineSchema()
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), SchemaLoader.standardCFMD(KEYSPACE,
TABLE));
+    }
+
+    @Test
+    public void testNothingToMigrate() throws IOException
+    {
+        File directory = Files.createTempDirectory(null).toFile();
+        try
+        {
+            testNothingToMigrate(directory);
+        }
+        finally
+        {
+            directory.deleteOnExit();
+        }
+    }
+
+    private static void testNothingToMigrate(File directory)
+    {
+        // truncate system.hints to enseure nothing inside
+        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS).truncateBlocking();
+        new LegacyHintsMigrator(directory, 128 * 1024 * 1024).migrate();
+        HintsCatalog catalog = HintsCatalog.load(directory);
+        assertEquals(0, catalog.stores().count());
+    }
+
+    @Test
+    public void testMigrationIsComplete() throws IOException
+    {
+        File directory = Files.createTempDirectory(null).toFile();
+        try
+        {
+            testMigrationIsComplete(directory);
+        }
+        finally
+        {
+            directory.deleteOnExit();
+        }
+    }
+
+    private static void testMigrationIsComplete(File directory)
+    {
+        long timestamp = System.currentTimeMillis();
+
+        // write 100 mutations for each of the 10 generated endpoints
+        Map<UUID, Queue<Mutation>> mutations = new HashMap<>();
+        for (int i = 0; i < 10; i++)
+        {
+            UUID hostId = UUID.randomUUID();
+            Queue<Mutation> queue = new LinkedList<>();
+            mutations.put(hostId, queue);
+
+            for (int j = 0; j < 100; j++)
+            {
+                Mutation mutation = createMutation(j, timestamp + j);
+                queue.offer(mutation);
+                Mutation legacyHint = createLegacyHint(mutation, timestamp, hostId);
+                legacyHint.applyUnsafe();
+            }
+        }
+
+        // run the migration
+        new LegacyHintsMigrator(directory, 128 * 1024 * 1024).migrate();
+
+        // validate that the hints table is truncated now
+        assertTrue(Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS).isEmpty());
+
+        HintsCatalog catalog = HintsCatalog.load(directory);
+
+        // assert that we've correctly loaded 10 hints stores
+        assertEquals(10, catalog.stores().count());
+
+        // for each of the 10 stores, make sure the mutations have been migrated correctly
+        for (Map.Entry<UUID, Queue<Mutation>> entry : mutations.entrySet())
+        {
+            HintsStore store = catalog.get(entry.getKey());
+            assertNotNull(store);
+
+            HintsDescriptor descriptor = store.poll();
+            assertNotNull(descriptor);
+
+            // read all the hints
+            Queue<Hint> actualHints = new LinkedList<>();
+            try (HintsReader reader = HintsReader.open(new File(directory, descriptor.fileName())))
+            {
+                for (HintsReader.Page page : reader)
+                    page.hintsIterator().forEachRemaining(actualHints::offer);
+            }
+
+            // assert the size matches
+            assertEquals(100, actualHints.size());
+
+            // compare expected hints to actual hints
+            for (int i = 0; i < 100; i++)
+            {
+                Hint hint = actualHints.poll();
+                Mutation mutation = entry.getValue().poll();
+                int ttl = mutation.smallestGCGS();
+
+                assertEquals(timestamp, hint.creationTime);
+                assertEquals(ttl, hint.gcgs);
+                assertMutationsEqual(mutation, hint.mutation);
+            }
+        }
+    }
+
+    // legacy hint mutation creation code, copied more or less verbatim from the previous
implementation
+    private static Mutation createLegacyHint(Mutation mutation, long now, UUID targetId)
+    {
+        int version = MessagingService.VERSION_21;
+        int ttl = mutation.smallestGCGS();
+        UUID hintId = UUIDGen.getTimeUUID();
+
+        ByteBuffer key = UUIDType.instance.decompose(targetId);
+        Clustering clustering = SystemKeyspace.LegacyHints.comparator.make(hintId, version);
+        ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer,
version));
+        Cell cell = BufferCell.expiring(SystemKeyspace.LegacyHints.compactValueColumn(),
+                                        now,
+                                        ttl,
+                                        FBUtilities.nowInSeconds(),
+                                        value);
+        return new Mutation(PartitionUpdate.singleRowUpdate(SystemKeyspace.LegacyHints,
+                                                            key,
+                                                            BTreeRow.singleCellRow(clustering,
cell)));
+    }
+
+    private static Mutation createMutation(int index, long timestamp)
+    {
+        CFMetaData table = Schema.instance.getCFMetaData(KEYSPACE, TABLE);
+        return new RowUpdateBuilder(table, timestamp, bytes(index))
+               .clustering(bytes(index))
+               .add("val", bytes(index))
+               .build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java b/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java
new file mode 100644
index 0000000..6f76db4
--- /dev/null
+++ b/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java
@@ -0,0 +1,56 @@
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+package org.apache.cassandra.metrics;
+
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.UUID;
+
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.hints.HintsService;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+
+public class HintedHandOffMetricsTest
+{
+    @Test
+    public void testHintsMetrics() throws Exception
+    {
+        DatabaseDescriptor.getHintsDirectory().mkdirs();
+
+        for (int i = 0; i < 99; i++)
+            HintsService.instance.metrics.incrPastWindow(InetAddress.getLocalHost());
+        HintsService.instance.metrics.log();
+
+        UntypedResultSet rows = executeInternal("SELECT hints_dropped FROM system." + SystemKeyspace.PEER_EVENTS);
+        Map<UUID, Integer> returned = rows.one().getMap("hints_dropped", UUIDType.instance,
Int32Type.instance);
+        assertEquals(Iterators.getLast(returned.values().iterator()).intValue(), 99);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/service/StorageProxyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/StorageProxyTest.java b/test/unit/org/apache/cassandra/service/StorageProxyTest.java
index 801fc53..42eb1f5 100644
--- a/test/unit/org/apache/cassandra/service/StorageProxyTest.java
+++ b/test/unit/org/apache/cassandra/service/StorageProxyTest.java
@@ -25,9 +25,9 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.Util.rp;
 import static org.apache.cassandra.Util.token;
@@ -78,6 +78,7 @@ public class StorageProxyTest
     @BeforeClass
     public static void beforeClass() throws Throwable
     {
+        DatabaseDescriptor.getHintsDirectory().mkdir();
         TokenMetadata tmd = StorageService.instance.getTokenMetadata();
         tmd.updateNormalToken(token("1"), InetAddress.getByName("127.0.0.1"));
         tmd.updateNormalToken(token("6"), InetAddress.getByName("127.0.0.6"));


Mime
View raw message