cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [2/3] cassandra git commit: Use the same repairedAt timestamp within incremental repair session
Date Thu, 20 Aug 2015 00:02:57 GMT
Use the same repairedAt timestamp within incremental repair session

patch by prmg; reviewed by yukim for CASSANDRA-9111


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/13172bd9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/13172bd9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/13172bd9

Branch: refs/heads/trunk
Commit: 13172bd993f86d44245e7140898c03db1a47073a
Parents: 4cc2b67
Author: prmg <prmg87@gmail.com>
Authored: Wed Aug 19 18:12:36 2015 -0500
Committer: Yuki Morishita <yukim@apache.org>
Committed: Wed Aug 19 18:12:36 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                               |  1 +
 .../apache/cassandra/repair/RepairMessageVerbHandler.java |  3 ++-
 .../apache/cassandra/repair/messages/PrepareMessage.java  | 10 ++++++++--
 .../org/apache/cassandra/service/ActiveRepairService.java |  9 +++++----
 .../db/compaction/LeveledCompactionStrategyTest.java      |  2 +-
 .../org/apache/cassandra/repair/LocalSyncTaskTest.java    |  2 +-
 6 files changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/13172bd9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0d17235..cea8c73 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,7 @@
  * Replace usage of Adler32 with CRC32 (CASSANDRA-8684)
  * Fix migration to new format from 2.1 SSTable (CASSANDRA-10006)
  * SequentialWriter should extend BufferedDataOutputStreamPlus (CASSANDRA-9500)
+ * Use the same repairedAt timestamp within incremental repair session (CASSANDRA-9111)
 Merged from 2.2:
  * Fix histogram overflow exception (CASSANDRA-9973)
  * Route gossip messages over dedicated socket (CASSANDRA-9237)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/13172bd9/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 28a3bf5..942d902 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -72,7 +72,8 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                     ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession,
                             columnFamilyStores,
                             prepareMessage.ranges,
-                            prepareMessage.isIncremental);
+                            prepareMessage.isIncremental,
+                            prepareMessage.timestamp);
                     MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE),
id, message.from);
                     break;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/13172bd9/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
index cd1b99d..0cd73db 100644
--- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
@@ -40,14 +40,16 @@ public class PrepareMessage extends RepairMessage
 
     public final UUID parentRepairSession;
     public final boolean isIncremental;
+    public final long timestamp;
 
-    public PrepareMessage(UUID parentRepairSession, List<UUID> cfIds, Collection<Range<Token>>
ranges, boolean isIncremental)
+    public PrepareMessage(UUID parentRepairSession, List<UUID> cfIds, Collection<Range<Token>>
ranges, boolean isIncremental, long timestamp)
     {
         super(Type.PREPARE_MESSAGE, null);
         this.parentRepairSession = parentRepairSession;
         this.cfIds = cfIds;
         this.ranges = ranges;
         this.isIncremental = isIncremental;
+        this.timestamp = timestamp;
     }
 
     public static class PrepareMessageSerializer implements MessageSerializer<PrepareMessage>
@@ -65,6 +67,7 @@ public class PrepareMessage extends RepairMessage
                 Range.tokenSerializer.serialize(r, out, version);
             }
             out.writeBoolean(message.isIncremental);
+            out.writeLong(message.timestamp);
         }
 
         public PrepareMessage deserialize(DataInputPlus in, int version) throws IOException
@@ -79,7 +82,8 @@ public class PrepareMessage extends RepairMessage
             for (int i = 0; i < rangeCount; i++)
                 ranges.add((Range<Token>) Range.tokenSerializer.deserialize(in, MessagingService.globalPartitioner(),
version));
             boolean isIncremental = in.readBoolean();
-            return new PrepareMessage(parentRepairSession, cfIds, ranges, isIncremental);
+            long timestamp = in.readLong();
+            return new PrepareMessage(parentRepairSession, cfIds, ranges, isIncremental,
timestamp);
         }
 
         public long serializedSize(PrepareMessage message, int version)
@@ -93,6 +97,7 @@ public class PrepareMessage extends RepairMessage
             for (Range<Token> r : message.ranges)
                 size += Range.tokenSerializer.serializedSize(r, version);
             size += TypeSizes.sizeof(message.isIncremental);
+            size += TypeSizes.sizeof(message.timestamp);
             return size;
         }
     }
@@ -105,6 +110,7 @@ public class PrepareMessage extends RepairMessage
                 ", ranges=" + ranges +
                 ", parentRepairSession=" + parentRepairSession +
                 ", isIncremental="+isIncremental +
+                ", timestamp=" + timestamp +
                 '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/13172bd9/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index e75d13e..0e09cf7 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -233,7 +233,8 @@ public class ActiveRepairService
 
     public synchronized UUID prepareForRepair(UUID parentRepairSession, Set<InetAddress>
endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores)
     {
-        registerParentRepairSession(parentRepairSession, columnFamilyStores, options.getRanges(),
options.isIncremental());
+        long timestamp = System.currentTimeMillis();
+        registerParentRepairSession(parentRepairSession, columnFamilyStores, options.getRanges(),
options.isIncremental(), timestamp);
         final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size());
         final AtomicBoolean status = new AtomicBoolean(true);
         final Set<String> failedNodes = Collections.synchronizedSet(new HashSet<String>());
@@ -263,7 +264,7 @@ public class ActiveRepairService
 
         for (InetAddress neighbour : endpoints)
         {
-            PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(),
options.isIncremental());
+            PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(),
options.isIncremental(), timestamp);
             MessageOut<RepairMessage> msg = message.createMessage();
             MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1),
true);
         }
@@ -286,9 +287,9 @@ public class ActiveRepairService
         return parentRepairSession;
     }
 
-    public void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore>
columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental)
+    public void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore>
columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long
timestamp)
     {
-        parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores,
ranges, isIncremental, System.currentTimeMillis()));
+        parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores,
ranges, isIncremental, timestamp));
     }
 
     public Set<SSTableReader> currentlyRepairing(UUID cfId, UUID parentRepairSession)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/13172bd9/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index bb15e88..9d5e5fc 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -185,7 +185,7 @@ public class LeveledCompactionStrategyTest
         Range<Token> range = new Range<>(Util.token(""), Util.token(""));
         int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(FBUtilities.nowInSeconds());
         UUID parentRepSession = UUID.randomUUID();
-        ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs),
Arrays.asList(range), false);
+        ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs),
Arrays.asList(range), false, System.currentTimeMillis());
         RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1,
CF_STANDARDDLEVELED, Arrays.asList(range));
         Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore);
         CompactionManager.instance.submitValidation(cfs, validator).get();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/13172bd9/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index 734e91b..ff5b99e 100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -89,7 +89,7 @@ public class LocalSyncTaskTest extends SchemaLoader
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
 
-        ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs),
Arrays.asList(range), false);
+        ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs),
Arrays.asList(range), false, System.currentTimeMillis());
 
         RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1,
"Standard1", Arrays.asList(range));
 


Mime
View raw message