accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [08/50] [abbrv] git commit: ACCUMULO-2819 Make sure that order records are deleted with the status and work records.
Date Wed, 21 May 2014 01:59:27 GMT
ACCUMULO-2819 Make sure that order records are deleted with the status and work records.

Tests for both the removal and the (de)serialization of the Order records.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f312cf14
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f312cf14
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f312cf14

Branch: refs/heads/ACCUMULO-378
Commit: f312cf14dbdbce4dcd4673635f0254c78bb0fcfb
Parents: 005b59f
Author: Josh Elser <elserj@apache.org>
Authored: Thu May 15 22:06:58 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Thu May 15 22:06:58 2014 -0400

----------------------------------------------------------------------
 .../core/replication/ReplicationSchema.java     | 11 ++----
 .../core/replication/ReplicationSchemaTest.java | 21 ++++++++++
 .../RemoveCompleteReplicationRecords.java       | 27 ++++++++++---
 .../master/replication/StatusMaker.java         |  3 +-
 .../RemoveCompleteReplicationRecordsTest.java   | 41 +++++++++++++++-----
 5 files changed, 79 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f312cf14/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
index 725758e..51bd7db 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
@@ -178,18 +178,14 @@ public class ReplicationSchema {
     }
 
     /**
-     * Creates the Mutation for the Order section for the given file and time, adding the
column
-     * as well using {@link OrderSection#add(Mutation, Text, Value)}
+     * Creates the Mutation for the Order section for the given file and time
      * @param file Filename
      * @param timeInMillis Time in millis that the file was closed
-     * @param tableId Source table id
-     * @param v Serialized Status msg as a Value
      * @return Mutation for the Order section
      */
-    public static Mutation createMutation(String file, long timeInMillis, Text tableId, Value
v) {
+    public static Mutation createMutation(String file, long timeInMillis) {
       Preconditions.checkNotNull(file);
       Preconditions.checkArgument(timeInMillis >= 0, "timeInMillis must be greater than
zero");
-      Preconditions.checkNotNull(v);
 
       // Encode the time so it sorts properly
       byte[] rowPrefix = longEncoder.encode(timeInMillis);
@@ -198,8 +194,7 @@ public class ReplicationSchema {
       row.append((ROW_SEPARATOR+file).getBytes(), 0, file.length() + ROW_SEPARATOR.length());
 
       // Make the mutation and add the column update
-      Mutation m = new Mutation(row);
-      return add(m, tableId, v);
+      return new Mutation(row);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f312cf14/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java
b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java
index d5d1435..d321153 100644
--- a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java
@@ -17,6 +17,8 @@
 package org.apache.accumulo.core.replication;
 
 import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.hadoop.io.Text;
@@ -98,4 +100,23 @@ public class ReplicationSchemaTest {
     Key k = new Key("file", StatusSection.NAME.toString(), "");
     WorkSection.getFile(k, new Text());
   }
+
+  @Test
+  public void orderSerialization() {
+    long now = System.currentTimeMillis();
+    Mutation m = OrderSection.createMutation("/accumulo/file", now);
+    Key k = new Key(new Text(m.getRow()));
+    Assert.assertEquals("/accumulo/file", OrderSection.getFile(k));
+    Assert.assertEquals(now, OrderSection.getTimeClosed(k));
+  }
+
+  @Test
+  public void orderSerializationWithBuffer() {
+    Text buff = new Text();
+    long now = System.currentTimeMillis();
+    Mutation m = OrderSection.createMutation("/accumulo/file", now);
+    Key k = new Key(new Text(m.getRow()));
+    Assert.assertEquals("/accumulo/file", OrderSection.getFile(k, buff));
+    Assert.assertEquals(now, OrderSection.getTimeClosed(k, buff));
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f312cf14/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
index fce72a7..5a89842 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
@@ -17,6 +17,7 @@
 package org.apache.accumulo.master.replication;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map.Entry;
 import java.util.SortedMap;
@@ -32,6 +33,10 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
@@ -71,6 +76,8 @@ public class RemoveCompleteReplicationRecords implements Runnable {
 
     bs.setRanges(Collections.singleton(new Range()));
     IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
+    StatusSection.limit(bs);
+    WorkSection.limit(bs);
     bs.addScanIterator(cfg);
 
     @SuppressWarnings("deprecation")
@@ -78,7 +85,7 @@ public class RemoveCompleteReplicationRecords implements Runnable {
     long recordsRemoved = 0;
     try {
       sw.start();
-      recordsRemoved = removeCompleteRecords(conn, ReplicationTable.NAME, bs, bw);
+      recordsRemoved = removeCompleteRecords(conn, bs, bw);
     } finally {
       if (null != bs) {
         bs.close();
@@ -109,7 +116,7 @@ public class RemoveCompleteReplicationRecords implements Runnable {
    *          A BatchWriter to write deletes to
    * @return Number of records removed
    */
-  protected long removeCompleteRecords(Connector conn, String table, BatchScanner bs, BatchWriter
bw) {
+  protected long removeCompleteRecords(Connector conn, BatchScanner bs, BatchWriter bw) {
     Text row = new Text(), colf = new Text(), colq = new Text();
     long recordsRemoved = 0;
 
@@ -134,10 +141,13 @@ public class RemoveCompleteReplicationRecords implements Runnable {
 
   protected long removeRowIfNecessary(BatchWriter bw, SortedMap<Key,Value> columns,
Text row, Text colf, Text colq) {
     long recordsRemoved = 0;
-    log.info("Removing {} from the replication table", row);
+    if (columns.isEmpty()) {
+      return recordsRemoved;
+    }
+
     Mutation m = new Mutation(row);
+    Status status = null;
     for (Entry<Key,Value> entry : columns.entrySet()) {
-      Status status;
       try {
         status = Status.parseFrom(entry.getValue().get());
       } catch (InvalidProtocolBufferException e) {
@@ -159,12 +169,19 @@ public class RemoveCompleteReplicationRecords implements Runnable {
       recordsRemoved++;
     }
 
+    log.info("Removing {} from the replication table", row);
+
+    ReplicationTarget target = ReplicationTarget.from(colq);
+
+    Mutation orderMutation = OrderSection.createMutation(row.toString(), status.getClosedTime());
+    orderMutation.putDelete(OrderSection.NAME, new Text(target.getSourceTableId()));
+
     // Send the mutation deleting all the columns at once.
     // If we send them not as a single Mutation, we run the risk of having some of them be
applied
     // which would mean that we might accidentally re-replicate data. We want to get rid
of them all at once
     // or not at all.
     try {
-      bw.addMutation(m);
+      bw.addMutations(Arrays.asList(m, orderMutation));
       bw.flush();
     } catch (MutationsRejectedException e) {
       log.error("Could not submit mutation to remove columns for {} in replication table",
row, e);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f312cf14/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
index 8941a56..a7ef8cb 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
@@ -199,7 +199,8 @@ public class StatusMaker {
         log.warn("Status record ({}) for {} in table {} was written to metadata table which
was closed but lacked closedTime", ProtobufUtil.toString(stat), file, tableId);
       }
 
-      Mutation m = OrderSection.createMutation(file, stat.getClosedTime(), tableId, value);
+      Mutation m = OrderSection.createMutation(file, stat.getClosedTime());
+      OrderSection.add(m, tableId, value);
 
       try {
         replicationWriter.addMutation(m);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f312cf14/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
b/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
index 078fd31..373062c 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
@@ -34,6 +34,7 @@ import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTarget;
@@ -93,7 +94,7 @@ public class RemoveCompleteReplicationRecordsTest {
 
     EasyMock.replay(bw);
 
-    rcrr.removeCompleteRecords(conn, ReplicationTable.NAME, bs, bw);
+    rcrr.removeCompleteRecords(conn, bs, bw);
     bs.close();
 
     Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
@@ -129,7 +130,7 @@ public class RemoveCompleteReplicationRecordsTest {
     EasyMock.replay(bw);
 
     // We don't remove any records, so we can just pass in a fake BW for both
-    rcrr.removeCompleteRecords(conn, ReplicationTable.NAME, bs, bw);
+    rcrr.removeCompleteRecords(conn, bs, bw);
     bs.close();
 
     Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
@@ -181,7 +182,7 @@ public class RemoveCompleteReplicationRecordsTest {
     bs.addScanIterator(cfg);
 
     try {
-      Assert.assertEquals(0l, rcrr.removeCompleteRecords(conn, ReplicationTable.NAME, bs,
replBw));
+      Assert.assertEquals(0l, rcrr.removeCompleteRecords(conn, bs, replBw));
     } finally {
       bs.close();
       replBw.close();
@@ -199,15 +200,23 @@ public class RemoveCompleteReplicationRecordsTest {
     builder.setEnd(10000);
     builder.setInfiniteEnd(false);
 
+    long time = System.currentTimeMillis();
     // Write out numRecords entries to both replication and metadata tables, none of which
are fully replicated
     for (int i = 0; i < numRecords; i++) {
+      builder.setClosedTime(time++);
       String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
       Mutation m = new Mutation(file);
-      StatusSection.add(m, new Text(Integer.toString(i)), ProtobufUtil.toValue(builder.setBegin(1000*(i+1)).build()));
+      Value v = ProtobufUtil.toValue(builder.setBegin(1000*(i+1)).build()); 
+      StatusSection.add(m, new Text(Integer.toString(i)), v);
+      replBw.addMutation(m);
+      m = OrderSection.createMutation(file, time);
+      OrderSection.add(m, new Text(Integer.toString(i)), v);
       replBw.addMutation(m);
     }
 
     Set<String> filesToRemove = new HashSet<>();
+    // We created two mutations for each file
+    numRecords *= 2;
     int finalNumRecords = numRecords;
 
     // Add two records that we can delete
@@ -215,23 +224,33 @@ public class RemoveCompleteReplicationRecordsTest {
     filesToRemove.add(fileToRemove);
     Mutation m = new Mutation(fileToRemove);
     ReplicationTarget target = new ReplicationTarget("peer1", "5", "5");
-    Value value = ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(true).build());
+    Value value = ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(true).setClosedTime(time).build());
     StatusSection.add(m, new Text("5"), value);
     WorkSection.add(m, target.toText(), value);
     replBw.addMutation(m);
 
-    numRecords += 2;
+    m = OrderSection.createMutation(fileToRemove, time);
+    OrderSection.add(m, new Text("5"), value);
+    replBw.addMutation(m);
+    time++;
+
+    numRecords += 3;
 
-    // Add a record with some stuff we replicated
     fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
     filesToRemove.add(fileToRemove);
     m = new Mutation(fileToRemove);
+    value = ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(true).setClosedTime(time).build());
     target = new ReplicationTarget("peer1", "6", "6");
     StatusSection.add(m, new Text("6"), value);
     WorkSection.add(m, target.toText(), value);
     replBw.addMutation(m);
 
-    numRecords += 2;
+    m = OrderSection.createMutation(fileToRemove, time);
+    OrderSection.add(m, new Text("6"), value);
+    replBw.addMutation(m);
+    time++;
+
+    numRecords += 3;
 
     replBw.flush();
 
@@ -241,11 +260,13 @@ public class RemoveCompleteReplicationRecordsTest {
     // We should remove the two fully completed records we inserted
     BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1);
     bs.setRanges(Collections.singleton(new Range()));
+    StatusSection.limit(bs);
+    WorkSection.limit(bs);
     IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
     bs.addScanIterator(cfg);
 
     try {
-      Assert.assertEquals(4l, rcrr.removeCompleteRecords(conn, ReplicationTable.NAME, bs,
replBw));
+      Assert.assertEquals(4l, rcrr.removeCompleteRecords(conn, bs, replBw));
     } finally {
       bs.close();
       replBw.close();
@@ -306,7 +327,7 @@ public class RemoveCompleteReplicationRecordsTest {
     bs.addScanIterator(cfg);
 
     try {
-      Assert.assertEquals(0l, rcrr.removeCompleteRecords(conn, ReplicationTable.NAME, bs,
replBw));
+      Assert.assertEquals(0l, rcrr.removeCompleteRecords(conn, bs, replBw));
     } finally {
       bs.close();
       replBw.close();


Mime
View raw message