cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [2/3] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0
Date Fri, 17 Jan 2014 16:54:52 GMT
Merge branch 'cassandra-1.2' into cassandra-2.0

Conflicts:
	src/java/org/apache/cassandra/db/BatchlogManager.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/88146236
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/88146236
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/88146236

Branch: refs/heads/trunk
Commit: 88146236988eb66c7396897abe66c05dd6a255f9
Parents: 14c6f70 b0b168f
Author: Aleksey Yeschenko <aleksey@apache.org>
Authored: Fri Jan 17 19:40:50 2014 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Fri Jan 17 19:40:50 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/BatchlogManager.java    | 78 ++++++++++++++-----
 .../cassandra/db/BatchlogManagerTest.java       | 82 ++++++++++++++++++++
 3 files changed, 143 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/88146236/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2095be7,f550863..80bc626
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -15,29 -11,10 +15,30 @@@ Merged from 1.2
   * Fix executing LOCAL_QUORUM with SimpleStrategy (CASSANDRA-6545)
   * Avoid StackOverflow when using large IN queries (CASSANDRA-6567)
   * Nodetool upgradesstables includes secondary indexes (CASSANDRA-6589)
+  * Paginate batchlog replay (CASSANDRA-6569)
  
  
 -1.2.13
 +2.0.4
 + * Allow removing snapshots of no-longer-existing CFs (CASSANDRA-6418)
 + * add StorageService.stopDaemon() (CASSANDRA-4268)
 + * add IRE for invalid CF supplied to get_count (CASSANDRA-5701)
 + * add client encryption support to sstableloader (CASSANDRA-6378)
 + * Fix accept() loop for SSL sockets post-shutdown (CASSANDRA-6468)
 + * Fix size-tiered compaction in LCS L0 (CASSANDRA-6496)
 + * Fix assertion failure in filterColdSSTables (CASSANDRA-6483)
 + * Fix row tombstones in larger-than-memory compactions (CASSANDRA-6008)
 + * Fix cleanup ClassCastException (CASSANDRA-6462)
 + * Reduce gossip memory use by interning VersionedValue strings (CASSANDRA-6410)
 + * Allow specifying datacenters to participate in a repair (CASSANDRA-6218)
 + * Fix divide-by-zero in PCI (CASSANDRA-6403)
 + * Fix setting last compacted key in the wrong level for LCS (CASSANDRA-6284)
 + * Add millisecond precision formats to the timestamp parser (CASSANDRA-6395)
 + * Expose a total memtable size metric for a CF (CASSANDRA-6391)
 + * cqlsh: handle symlinks properly (CASSANDRA-6425)
 + * Fix potential infinite loop when paging query with IN (CASSANDRA-6464)
 + * Fix assertion error in AbstractQueryPager.discardFirst (CASSANDRA-6447)
 + * Fix streaming older SSTable yields unnecessary tombstones (CASSANDRA-6527)
 +Merged from 1.2:
   * Improved error message on bad properties in DDL queries (CASSANDRA-6453)
   * Randomize batchlog candidates selection (CASSANDRA-6481)
   * Fix thundering herd on endpoint cache invalidation (CASSANDRA-6345, 6485)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88146236/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/BatchlogManager.java
index cfa049a,90dfd47..23cacca
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@@ -124,16 -126,23 +125,21 @@@ public class BatchlogManager implement
  
      public static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations,
UUID uuid)
      {
-         long timestamp = FBUtilities.timestampMicros();
-         ByteBuffer writtenAt = LongType.instance.decompose(timestamp / 1000);
+         return getBatchlogMutationFor(mutations, uuid, FBUtilities.timestampMicros());
+     }
+ 
+     @VisibleForTesting
+     static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations, UUID
uuid, long now)
+     {
+         ByteBuffer writtenAt = LongType.instance.decompose(now / 1000);
          ByteBuffer data = serializeRowMutations(mutations);
  
 -        ColumnFamily cf = ColumnFamily.create(CFMetaData.BatchlogCf);
 +        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(CFMetaData.BatchlogCf);
-         cf.addColumn(new Column(columnName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
-         cf.addColumn(new Column(columnName("data"), data, timestamp));
-         cf.addColumn(new Column(columnName("written_at"), writtenAt, timestamp));
+         cf.addColumn(new Column(columnName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, now));
 -        cf.addColumn(new Column(columnName("written_at"), writtenAt, now));
+         cf.addColumn(new Column(columnName("data"), data, now));
 -        RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid));
 -        rm.add(cf);
++        cf.addColumn(new Column(columnName("written_at"), writtenAt, now));
  
 -        return rm;
 +        return new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(uuid), cf);
      }
  
      private static ByteBuffer serializeRowMutations(Collection<RowMutation> mutations)
@@@ -169,9 -179,25 +176,25 @@@
  
          try
          {
-             for (UntypedResultSet.Row row : process("SELECT id, written_at FROM %s.%s",
Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF))
-                 if (System.currentTimeMillis() > row.getLong("written_at") + TIMEOUT)
-                     replayBatch(row.getUUID("id"), rateLimiter);
+             UntypedResultSet page = process("SELECT id, data, written_at FROM %s.%s LIMIT
%d",
 -                                            Table.SYSTEM_KS,
 -                                            SystemTable.BATCHLOG_CF,
++                                            Keyspace.SYSTEM_KS,
++                                            SystemKeyspace.BATCHLOG_CF,
+                                             PAGE_SIZE);
+ 
+             while (!page.isEmpty())
+             {
+                 UUID id = processBatchlogPage(page, rateLimiter);
+ 
+                 if (page.size() < PAGE_SIZE)
+                     break; // we've exhausted the batchlog, next query would be empty.
+ 
+                 page = process("SELECT id, data, written_at FROM %s.%s WHERE token(id) >
token(%s) LIMIT %d",
 -                               Table.SYSTEM_KS,
 -                               SystemTable.BATCHLOG_CF,
++                               Keyspace.SYSTEM_KS,
++                               SystemKeyspace.BATCHLOG_CF,
+                                id,
+                                PAGE_SIZE);
+             }
+ 
              cleanup();
          }
          finally
@@@ -205,6 -243,13 +240,13 @@@
          totalBatchesReplayed.incrementAndGet();
      }
  
+     private void deleteBatch(UUID id)
+     {
 -        RowMutation mutation = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(id));
 -        mutation.delete(new QueryPath(SystemTable.BATCHLOG_CF, null, null), System.currentTimeMillis());
++        RowMutation mutation = new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(id));
++        mutation.delete(SystemKeyspace.BATCHLOG_CF, System.currentTimeMillis());
+         mutation.apply();
+     }
+ 
      private void replaySerializedMutations(ByteBuffer data, long writtenAt, RateLimiter
rateLimiter) throws IOException
      {
          DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88146236/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
index 0000000,637815b..fd1fa81
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
@@@ -1,0 -1,82 +1,82 @@@
+ package org.apache.cassandra.db;
+ 
+ import java.net.InetAddress;
+ import java.util.Collections;
+ 
+ import org.junit.Before;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.Util;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.cql3.QueryProcessor;
+ import org.apache.cassandra.cql3.UntypedResultSet;
 -import org.apache.cassandra.db.filter.QueryPath;
+ import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.UUIDGen;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
+ 
+ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+ 
+ public class BatchlogManagerTest extends SchemaLoader
+ {
+     @Before
+     public void setUp() throws Exception
+     {
+         TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+         InetAddress localhost = InetAddress.getByName("127.0.0.1");
+         metadata.updateNormalToken(Util.token("A"), localhost);
+         metadata.updateHostId(UUIDGen.getTimeUUID(), localhost);
+     }
+ 
+     @Test
+     public void testReplay() throws Exception
+     {
+         assertEquals(0, BatchlogManager.instance.countAllBatches());
+         assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed());
+ 
+         // Generate 1000 mutations and put them all into the batchlog.
+         // Half (500) ready to be replayed, half not.
+         for (int i = 0; i < 1000; i++)
+         {
+             RowMutation mutation = new RowMutation("Keyspace1", bytes(i));
 -            mutation.add(new QueryPath("Standard1", null, bytes(i)), bytes(i), 0);
++            mutation.add("Standard1", bytes(i), bytes(i), System.currentTimeMillis());
++
+             long timestamp = System.currentTimeMillis();
+             if (i < 500)
+                 timestamp -= DatabaseDescriptor.getWriteRpcTimeout() * 2;
+             BatchlogManager.getBatchlogMutationFor(Collections.singleton(mutation), UUIDGen.getTimeUUID(),
timestamp * 1000).apply();
+         }
+ 
+         assertEquals(1000, BatchlogManager.instance.countAllBatches());
+         assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed());
+ 
+         // Force batchlog replay.
+         BatchlogManager.instance.replayAllFailedBatches();
+ 
+         // Ensure that the first half, and only the first half, got replayed.
+         assertEquals(500, BatchlogManager.instance.countAllBatches());
+         assertEquals(500, BatchlogManager.instance.getTotalBatchesReplayed());
+ 
+         for (int i = 0; i < 1000; i++)
+         {
+             UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT
* FROM \"Keyspace1\".\"Standard1\" WHERE key = intAsBlob(%d)", i));
+             if (i < 500)
+             {
+                 assertEquals(bytes(i), result.one().getBytes("key"));
+                 assertEquals(bytes(i), result.one().getBytes("column1"));
+                 assertEquals(bytes(i), result.one().getBytes("value"));
+             }
+             else
+             {
+                 assertTrue(result.isEmpty());
+             }
+         }
+ 
+         // Ensure that no stray mutations got somehow applied.
+         UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT count(*)
FROM \"Keyspace1\".\"Standard1\""));
+         assertEquals(500, result.one().getLong("count"));
+     }
+ }


Mime
View raw message