cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [3/3] git commit: Merge branch 'cassandra-2.0' into trunk
Date Fri, 17 Jan 2014 16:54:53 GMT
Merge branch 'cassandra-2.0' into trunk

Conflicts:
	src/java/org/apache/cassandra/db/BatchlogManager.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5c112047
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5c112047
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5c112047

Branch: refs/heads/trunk
Commit: 5c112047f8b47de3b400d96e2d5533cd8004b3bf
Parents: bf14289 8814623
Author: Aleksey Yeschenko <aleksey@apache.org>
Authored: Fri Jan 17 19:54:30 2014 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Fri Jan 17 19:54:30 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/BatchlogManager.java    | 78 +++++++++++++-----
 .../cassandra/db/BatchlogManagerTest.java       | 84 ++++++++++++++++++++
 3 files changed, 145 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c112047/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c112047/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/BatchlogManager.java
index 4ce7f41,23cacca..86b7b17
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@@ -122,21 -123,26 +123,26 @@@ public class BatchlogManager implement
          batchlogTasks.execute(runnable);
      }
  
 -    public static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations,
UUID uuid)
 +    public static Mutation getBatchlogMutationFor(Collection<Mutation> mutations,
UUID uuid)
      {
-         long timestamp = FBUtilities.timestampMicros();
-         ByteBuffer writtenAt = LongType.instance.decompose(timestamp / 1000);
+         return getBatchlogMutationFor(mutations, uuid, FBUtilities.timestampMicros());
+     }
+ 
+     @VisibleForTesting
 -    static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations, UUID
uuid, long now)
++    static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid,
long now)
+     {
+         ByteBuffer writtenAt = LongType.instance.decompose(now / 1000);
 -        ByteBuffer data = serializeRowMutations(mutations);
 +        ByteBuffer data = serializeMutations(mutations);
  
          ColumnFamily cf = ArrayBackedSortedColumns.factory.create(CFMetaData.BatchlogCf);
-         cf.addColumn(new Cell(cellName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
-         cf.addColumn(new Cell(cellName("data"), data, timestamp));
-         cf.addColumn(new Cell(cellName("written_at"), writtenAt, timestamp));
 -        cf.addColumn(new Column(columnName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, now));
 -        cf.addColumn(new Column(columnName("data"), data, now));
 -        cf.addColumn(new Column(columnName("written_at"), writtenAt, now));
++        cf.addColumn(new Cell(cellName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, now));
++        cf.addColumn(new Cell(cellName("data"), data, now));
++        cf.addColumn(new Cell(cellName("written_at"), writtenAt, now));
  
 -        return new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(uuid), cf);
 +        return new Mutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(uuid), cf);
      }
  
 -    private static ByteBuffer serializeRowMutations(Collection<RowMutation> mutations)
 +    private static ByteBuffer serializeMutations(Collection<Mutation> mutations)
      {
          FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
          DataOutputStream out = new DataOutputStream(bos);
@@@ -205,6 -240,13 +240,13 @@@
          totalBatchesReplayed.incrementAndGet();
      }
  
+     private void deleteBatch(UUID id)
+     {
 -        RowMutation mutation = new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(id));
++        Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(id));
+         mutation.delete(SystemKeyspace.BATCHLOG_CF, System.currentTimeMillis());
+         mutation.apply();
+     }
+ 
      private void replaySerializedMutations(ByteBuffer data, long writtenAt, RateLimiter
rateLimiter) throws IOException
      {
          DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c112047/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
index 0000000,fd1fa81..8fee6d6
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
@@@ -1,0 -1,82 +1,84 @@@
+ package org.apache.cassandra.db;
+ 
+ import java.net.InetAddress;
+ import java.util.Collections;
+ 
+ import org.junit.Before;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.Util;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.cql3.QueryProcessor;
+ import org.apache.cassandra.cql3.UntypedResultSet;
++import org.apache.cassandra.db.composites.CellNameType;
+ import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.UUIDGen;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
+ 
+ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+ 
+ public class BatchlogManagerTest extends SchemaLoader
+ {
+     @Before
+     public void setUp() throws Exception
+     {
+         TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+         InetAddress localhost = InetAddress.getByName("127.0.0.1");
+         metadata.updateNormalToken(Util.token("A"), localhost);
+         metadata.updateHostId(UUIDGen.getTimeUUID(), localhost);
+     }
+ 
+     @Test
+     public void testReplay() throws Exception
+     {
+         assertEquals(0, BatchlogManager.instance.countAllBatches());
+         assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed());
+ 
+         // Generate 1000 mutations and put them all into the batchlog.
+         // Half (500) ready to be replayed, half not.
++        CellNameType comparator = Keyspace.open("Keyspace1").getColumnFamilyStore("Standard1").metadata.comparator;
+         for (int i = 0; i < 1000; i++)
+         {
 -            RowMutation mutation = new RowMutation("Keyspace1", bytes(i));
 -            mutation.add("Standard1", bytes(i), bytes(i), System.currentTimeMillis());
++            Mutation mutation = new Mutation("Keyspace1", bytes(i));
++            mutation.add("Standard1", comparator.makeCellName(bytes(i)), bytes(i), System.currentTimeMillis());
+ 
+             long timestamp = System.currentTimeMillis();
+             if (i < 500)
+                 timestamp -= DatabaseDescriptor.getWriteRpcTimeout() * 2;
+             BatchlogManager.getBatchlogMutationFor(Collections.singleton(mutation), UUIDGen.getTimeUUID(),
timestamp * 1000).apply();
+         }
+ 
+         assertEquals(1000, BatchlogManager.instance.countAllBatches());
+         assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed());
+ 
+         // Force batchlog replay.
+         BatchlogManager.instance.replayAllFailedBatches();
+ 
+         // Ensure that the first half, and only the first half, got replayed.
+         assertEquals(500, BatchlogManager.instance.countAllBatches());
+         assertEquals(500, BatchlogManager.instance.getTotalBatchesReplayed());
+ 
+         for (int i = 0; i < 1000; i++)
+         {
+             UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT
* FROM \"Keyspace1\".\"Standard1\" WHERE key = intAsBlob(%d)", i));
+             if (i < 500)
+             {
+                 assertEquals(bytes(i), result.one().getBytes("key"));
+                 assertEquals(bytes(i), result.one().getBytes("column1"));
+                 assertEquals(bytes(i), result.one().getBytes("value"));
+             }
+             else
+             {
+                 assertTrue(result.isEmpty());
+             }
+         }
+ 
+         // Ensure that no stray mutations got somehow applied.
+         UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT count(*)
FROM \"Keyspace1\".\"Standard1\""));
+         assertEquals(500, result.one().getLong("count"));
+     }
+ }


Mime
View raw message