accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [2/2] accumulo git commit: ACCUMULO-3663 roll walogs for replication to start
Date Tue, 17 Mar 2015 12:41:40 GMT
ACCUMULO-3663 roll walogs for replication to start


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ca64067f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ca64067f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ca64067f

Branch: refs/heads/master
Commit: ca64067f2222abf9968fcac6ec54307b4dabaa0c
Parents: 52b7785
Author: Eric C. Newton <eric.newton@gmail.com>
Authored: Tue Mar 17 08:39:07 2015 -0400
Committer: Eric C. Newton <eric.newton@gmail.com>
Committed: Tue Mar 17 08:39:07 2015 -0400

----------------------------------------------------------------------
 .../test/randomwalk/concurrent/Replication.java | 71 +++++++++++++++-----
 .../replication/ReplicationRandomWalkIT.java    | 14 +++-
 2 files changed, 69 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/ca64067f/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Replication.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Replication.java
b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Replication.java
index bc2d64c..a8db8e5 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Replication.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Replication.java
@@ -31,6 +31,7 @@ import static org.apache.accumulo.server.replication.ReplicaSystemFactory.getPee
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Random;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
@@ -55,6 +56,9 @@ import org.apache.hadoop.io.Text;
 
 public class Replication extends Test {
 
+  final int ROWS = 1000;
+  final int COLS = 50;
+
   @Override
   public void visit(State state, Environment env, Properties props) throws Exception {
     final Connector c = env.getConnector();
@@ -62,16 +66,20 @@ public class Replication extends Test {
     final String instName = inst.getInstanceName();
     final InstanceOperations iOps = c.instanceOperations();
     final TableOperations tOps = c.tableOperations();
+
+    // Replicate to ourselves
     iOps.setProperty(REPLICATION_NAME.getKey(), instName);
-    iOps.setProperty(MASTER_REPLICATION_SCAN_INTERVAL.getKey(), "1s");
-    iOps.setProperty(REPLICATION_WORK_ASSIGNMENT_SLEEP.getKey(), "1s");
-    iOps.setProperty(REPLICATION_WORK_PROCESSOR_DELAY.getKey(), "1s");
-    iOps.setProperty(REPLICATION_WORK_PROCESSOR_PERIOD.getKey(), "1s");
     iOps.setProperty(REPLICATION_PEERS.getKey() + instName,
         getPeerConfigurationValue(AccumuloReplicaSystem.class, instName + "," + inst.getZooKeepers()));
     iOps.setProperty(REPLICATION_PEER_USER.getKey() + instName , env.getUserName());
     iOps.setProperty(REPLICATION_PEER_PASSWORD.getKey() + instName, env.getPassword());
+    // Tweak some replication parameters to make the replication go faster
+    iOps.setProperty(MASTER_REPLICATION_SCAN_INTERVAL.getKey(), "1s");
+    iOps.setProperty(REPLICATION_WORK_ASSIGNMENT_SLEEP.getKey(), "1s");
+    iOps.setProperty(REPLICATION_WORK_PROCESSOR_DELAY.getKey(), "1s");
+    iOps.setProperty(REPLICATION_WORK_PROCESSOR_PERIOD.getKey(), "1s");
 
+    // Ensure the replication table is online
     ReplicationTable.setOnline(c);
     boolean online = ReplicationTable.isOnline(c);
     for (int i = 0; i < 10; i++) {
@@ -81,6 +89,7 @@ public class Replication extends Test {
     }
     assertTrue("Replication table was not online", online);
 
+    // Make a source and destination table
     final String sourceTable = ("repl-source-" + UUID.randomUUID()).replace('-', '_');
     final String destTable = ("repl-dest-" + UUID.randomUUID()).replace('-', '_');
     final String tables[] = new String[] { sourceTable, destTable };
@@ -89,13 +98,16 @@ public class Replication extends Test {
       log.debug("creating " + tableName);
       tOps.create(tableName);
     }
-    // zookeeper propagation wait
-    UtilWaitThread.sleep(5 * 1000);
 
+    // Point the source to the destination
     final String destID = tOps.tableIdMap().get(destTable);
     tOps.setProperty(sourceTable, TABLE_REPLICATION.getKey(), "true");
     tOps.setProperty(sourceTable, TABLE_REPLICATION_TARGET.getKey() + instName, destID);
 
+    // zookeeper propagation wait
+    UtilWaitThread.sleep(5 * 1000);
+
+    // Maybe split the tables
     Random rand = new Random(System.currentTimeMillis());
     for (String tableName : tables) {
       if (rand.nextBoolean()) {
@@ -103,22 +115,44 @@ public class Replication extends Test {
       }
     }
 
-    final BatchWriter bw = c.createBatchWriter(sourceTable, null);
-    final int ROWS = 1000;
-    final int COLS = 50;
+    // write some checkable data
+    BatchWriter bw = c.createBatchWriter(sourceTable, null);
     for (int row = 0; row < ROWS; row++) {
-      Mutation m = new Mutation(Integer.toString(row));
+      Mutation m = new Mutation(itos(row));
       for (int col = 0; col < COLS; col++) {
-        m.put("", Integer.toString(col), "");
+        m.put("", itos(col), "");
       }
       bw.addMutation(m);
     }
     bw.close();
-    UtilWaitThread.sleep(1000);
 
-    c.replicationOperations().drain(sourceTable);
+    // attempt to force the WAL to roll so replication begins
+    final Set<String> origRefs = c.replicationOperations().referencedFiles(sourceTable);
+    // write some data we will ignore
+    while (true) {
+      final Set<String> updatedFileRefs = c.replicationOperations().referencedFiles(sourceTable);
+      updatedFileRefs.retainAll(origRefs);
+      log.debug("updateFileRefs size " + updatedFileRefs.size());
+      if (updatedFileRefs.isEmpty()) {
+        break;
+      }
+      bw = c.createBatchWriter(sourceTable, null);
+      for (int row = 0; row < ROWS; row++) {
+        Mutation m = new Mutation(itos(row));
+        for (int col = 0; col < COLS; col++) {
+          m.put("ignored", itos(col), "");
+        }
+        bw.addMutation(m);
+      }
+      bw.close();
+    }
+
+    // wait a little while for replication to take place
+    UtilWaitThread.sleep(30 * 1000);
 
+    // check the data
     Scanner scanner = c.createScanner(destTable, Authorizations.EMPTY);
+    scanner.fetchColumnFamily(new Text(""));
     int row = 0;
     int col = 0;
     for (Entry<Key,Value> entry : scanner) {
@@ -131,28 +165,35 @@ public class Replication extends Test {
       }
     }
     assertEquals(ROWS, row);
-    assertEquals(COLS, col);
+    assertEquals(0, col);
 
+    // cleanup
     for (String tableName : tables) {
       log.debug("Deleting " + tableName);
       tOps.delete(tableName);
     }
   }
 
+  // junit isn't a dependency
   private void assertEquals(int expected, int actual) {
     if (expected != actual)
       throw new RuntimeException(String.format("%d fails to match expected value %d", actual,
expected));
   }
 
+  // junit isn't a dependency
   private void assertTrue(String string, boolean test) {
       if (!test)
         throw new RuntimeException(string);
   }
 
+  private static String itos(int i) {
+    return String.format("%08d", i);
+  }
+
   private void splitTable(TableOperations tOps, String tableName) throws Exception {
     SortedSet<Text> splits = new TreeSet<>();
     for (int i = 1; i <= 9; i++) {
-      splits.add(new Text(Integer.toString(i)));
+      splits.add(new Text(itos(i * (ROWS / 10))));
     }
     log.debug("Adding splits to " + tableName);
     tOps.addSplits(tableName, splits);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ca64067f/test/src/test/java/org/apache/accumulo/test/replication/ReplicationRandomWalkIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationRandomWalkIT.java
b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationRandomWalkIT.java
index e91d5ac..43d1f20 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationRandomWalkIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationRandomWalkIT.java
@@ -16,19 +16,31 @@
  */
 package org.apache.accumulo.test.replication;
 
+import static org.apache.accumulo.core.conf.Property.TSERV_ARCHIVE_WALOGS;
+import static org.apache.accumulo.core.conf.Property.TSERV_WALOG_MAX_SIZE;
+
 import java.util.Properties;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.accumulo.test.randomwalk.Environment;
 import org.apache.accumulo.test.randomwalk.concurrent.Replication;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 
 public class ReplicationRandomWalkIT extends ConfigurableMacIT {
 
-  @Test
+  @Override
+  protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setProperty(TSERV_ARCHIVE_WALOGS, "false");
+    cfg.setProperty(TSERV_WALOG_MAX_SIZE, "1M");
+    cfg.setNumTservers(1);
+  }
+
+  @Test(timeout = 5 * 60 * 1000)
   public void runReplicationRandomWalkStep() throws Exception {
     Replication r = new Replication();
 


Mime
View raw message