cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [1/4] git commit: 2.0 compatibility modifications for CASSANDRA-6931
Date Mon, 31 Mar 2014 10:13:06 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk eeef4061b -> a2a463a66


2.0 compatibility modifications for CASSANDRA-6931

patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for
CASSANDRA-6931


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d049017a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d049017a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d049017a

Branch: refs/heads/trunk
Commit: d049017ac85ce22e7dcf87879e94b386987b19e6
Parents: 6874aaa
Author: Aleksey Yeschenko <aleksey@apache.org>
Authored: Mon Mar 31 12:53:24 2014 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Mon Mar 31 12:53:24 2014 +0300

----------------------------------------------------------------------
 .../org/apache/cassandra/config/CFMetaData.java |  3 ++-
 .../apache/cassandra/db/BatchlogManager.java    | 22 ++++++++++----------
 2 files changed, 13 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d049017a/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index ff40e65..1f25cea 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -233,7 +233,8 @@ public final class CFMetaData
     public static final CFMetaData BatchlogCf = compile("CREATE TABLE " + SystemKeyspace.BATCHLOG_CF
+ " ("
                                                         + "id uuid PRIMARY KEY,"
                                                         + "written_at timestamp,"
-                                                        + "data blob"
+                                                        + "data blob,"
+                                                        + "version int,"
                                                         + ") WITH COMMENT='uncommited batches'
AND gc_grace_seconds=0 "
                                                         + "AND COMPACTION={'class' : 'SizeTieredCompactionStrategy',
'min_threshold' : 2}");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d049017a/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 23cacca..2e09285 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -66,7 +66,6 @@ import org.apache.cassandra.utils.WrappedRunnable;
 public class BatchlogManager implements BatchlogManagerMBean
 {
     private static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
-    private static final int VERSION = MessagingService.VERSION_12;
     private static final long REPLAY_INTERVAL = 60 * 1000; // milliseconds
     private static final int PAGE_SIZE = 128; // same as HHOM, for now, w/out using any heuristics.
TODO: set based on avg batch size.
 
@@ -151,7 +150,7 @@ public class BatchlogManager implements BatchlogManagerMBean
         {
             out.writeInt(mutations.size());
             for (RowMutation rm : mutations)
-                RowMutation.serializer.serialize(rm, out, VERSION);
+                RowMutation.serializer.serialize(rm, out, MessagingService.VERSION_12);
         }
         catch (IOException e)
         {
@@ -176,7 +175,7 @@ public class BatchlogManager implements BatchlogManagerMBean
 
         try
         {
-            UntypedResultSet page = process("SELECT id, data, written_at FROM %s.%s LIMIT
%d",
+            UntypedResultSet page = process("SELECT id, data, written_at, version FROM %s.%s
LIMIT %d",
                                             Keyspace.SYSTEM_KS,
                                             SystemKeyspace.BATCHLOG_CF,
                                             PAGE_SIZE);
@@ -188,7 +187,7 @@ public class BatchlogManager implements BatchlogManagerMBean
                 if (page.size() < PAGE_SIZE)
                     break; // we've exhausted the batchlog, next query would be empty.
 
-                page = process("SELECT id, data, written_at FROM %s.%s WHERE token(id) >
token(%s) LIMIT %d",
+                page = process("SELECT id, data, written_at, version FROM %s.%s WHERE token(id)
> token(%s) LIMIT %d",
                                Keyspace.SYSTEM_KS,
                                SystemKeyspace.BATCHLOG_CF,
                                id,
@@ -213,22 +212,23 @@ public class BatchlogManager implements BatchlogManagerMBean
         {
             id = row.getUUID("id");
             long writtenAt = row.getLong("written_at");
+            int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12;
             // enough time for the actual write + batchlog entry mutation delivery (two separate
requests).
             long timeout = DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for
the actual write + BM removal mutation
             if (System.currentTimeMillis() < writtenAt + timeout)
                 continue; // not ready to replay yet, might still get a deletion.
-            replayBatch(id, row.getBytes("data"), writtenAt, rateLimiter);
+            replayBatch(id, row.getBytes("data"), writtenAt, version, rateLimiter);
         }
         return id;
     }
 
-    private void replayBatch(UUID id, ByteBuffer data, long writtenAt, RateLimiter rateLimiter)
+    private void replayBatch(UUID id, ByteBuffer data, long writtenAt, int version, RateLimiter
rateLimiter)
     {
         logger.debug("Replaying batch {}", id);
 
         try
         {
-            replaySerializedMutations(data, writtenAt, rateLimiter);
+            replaySerializedMutations(data, writtenAt, version, rateLimiter);
         }
         catch (IOException e)
         {
@@ -247,19 +247,19 @@ public class BatchlogManager implements BatchlogManagerMBean
         mutation.apply();
     }
 
-    private void replaySerializedMutations(ByteBuffer data, long writtenAt, RateLimiter rateLimiter)
throws IOException
+    private void replaySerializedMutations(ByteBuffer data, long writtenAt, int version,
RateLimiter rateLimiter) throws IOException
     {
         DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
         int size = in.readInt();
         for (int i = 0; i < size; i++)
-            replaySerializedMutation(RowMutation.serializer.deserialize(in, VERSION), writtenAt,
rateLimiter);
+            replaySerializedMutation(RowMutation.serializer.deserialize(in, version), writtenAt,
version, rateLimiter);
     }
 
     /*
      * We try to deliver the mutations to the replicas ourselves if they are alive and only
resort to writing hints
      * when a replica is down or a write request times out.
      */
-    private void replaySerializedMutation(RowMutation mutation, long writtenAt, RateLimiter
rateLimiter)
+    private void replaySerializedMutation(RowMutation mutation, long writtenAt, int version,
RateLimiter rateLimiter)
     {
         int ttl = calculateHintTTL(mutation, writtenAt);
         if (ttl <= 0)
@@ -268,7 +268,7 @@ public class BatchlogManager implements BatchlogManagerMBean
         Set<InetAddress> liveEndpoints = new HashSet<>();
         String ks = mutation.getKeyspaceName();
         Token<?> tk = StorageService.getPartitioner().getToken(mutation.key());
-        int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, VERSION);
+        int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, version);
 
         for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks,
tk),
                                                      StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk,
ks)))


Mime
View raw message