accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [40/50] [abbrv] git commit: ACCUMULO-2819 More logging to help debug the SequentialWorkAssigner. I think tests are fully functional again.
Date Wed, 21 May 2014 01:59:59 GMT
ACCUMULO-2819 More logging to help debug the SequentialWorkAssigner. I think tests are fully
functional again.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d5c863de
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d5c863de
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d5c863de

Branch: refs/heads/ACCUMULO-378
Commit: d5c863de85d94d61ab784687435b183c2d5d4966
Parents: dfa0fd3
Author: Josh Elser <elserj@apache.org>
Authored: Tue May 20 01:26:20 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Tue May 20 01:26:20 2014 -0400

----------------------------------------------------------------------
 .../core/client/impl/ReplicationClient.java     |  2 +-
 .../client/impl/ReplicationOperationsImpl.java  |  4 +-
 .../server/util/MasterMetadataUtil.java         |  1 +
 .../gc/GarbageCollectWriteAheadLogs.java        |  4 +-
 .../RemoveCompleteReplicationRecords.java       | 50 ++++++----
 .../replication/SequentialWorkAssigner.java     | 39 +++++---
 .../master/replication/StatusMaker.java         |  2 +
 .../accumulo/master/replication/WorkMaker.java  |  2 +
 .../org/apache/accumulo/tserver/Tablet.java     |  2 +-
 .../replication/ReplicationProcessor.java       |  2 +
 .../test/replication/ReplicationIT.java         | 99 +++-----------------
 .../replication/ReplicationSequentialIT.java    | 33 +++----
 test/src/test/resources/log4j.properties        |  2 +-
 13 files changed, 105 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/d5c863de/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
index 762e74d..55d2208 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
@@ -89,7 +89,7 @@ public class ReplicationClient {
       ZooReader reader = new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
       replCoordinatorAddr = new String(reader.getData(zkPath, null), StandardCharsets.UTF_8);
     } catch (KeeperException | InterruptedException e) {
-      log.debug("Could not fetch remote coordinator port");
+      log.error("Could not fetch remote coordinator port");
       return null;
     }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d5c863de/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
index 8ee09cb..373a51b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
@@ -205,12 +205,12 @@ public class ReplicationOperationsImpl implements ReplicationOperations
{
           file = rowHolder.toString();
         }
 
-        log.debug("Evaluating if {} is still needed", file);
-
         // Skip files that we didn't observe when we started (new files/data)
         if (!relevantLogs.contains(file)) {
           log.debug("Found file that we didn't care about {}", file);
           continue;
+        } else {
+          log.debug("Found file that we *do* care about {}", file);
         }
 
         try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d5c863de/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
index cecfceb..05e1a0b 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
@@ -309,6 +309,7 @@ public class MasterMetadataUtil {
     }
     if (unusedWalLogs != null) {
       for (String entry : unusedWalLogs) {
+        log.info("Removed WAL " + entry + " from " + extent, new Exception());
         m.putDelete(LogColumnFamily.NAME, new Text(entry));
       }
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d5c863de/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 950bc12..c551c6c 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -43,6 +43,7 @@ import org.apache.accumulo.core.gc.thrift.GCStatus;
 import org.apache.accumulo.core.gc.thrift.GcCycleStats;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
@@ -74,7 +75,6 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Iterables;
 import com.google.common.net.HostAndPort;
 import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.TextFormat;
 
 public class GarbageCollectWriteAheadLogs {
   private static final Logger log = LoggerFactory.getLogger(GarbageCollectWriteAheadLogs.class);
@@ -400,7 +400,7 @@ public class GarbageCollectWriteAheadLogs {
     for (Entry<Key,Value> entry : iter) {
       try {
         Status status = Status.parseFrom(entry.getValue().get());
-        log.info("Checking if {} is safe for removal with {}", wal, TextFormat.shortDebugString(status));
+        log.info("Checking if {} is safe for removal with {}", wal, ProtobufUtil.toString(status));
         if (!StatusUtil.isSafeForRemoval(status)) {
           return true;
         }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d5c863de/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
index 35ce374..dd5ccd8 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
@@ -17,8 +17,12 @@
 package org.apache.accumulo.master.replication;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.SortedMap;
 
@@ -143,9 +147,9 @@ public class RemoveCompleteReplicationRecords implements Runnable {
     }
 
     Mutation m = new Mutation(row);
-    Status status = null;
-    long closedTime = -1l;
+    Map<String,Long> tableToTimeClosed = new HashMap<>();
     for (Entry<Key,Value> entry : columns.entrySet()) {
+      Status status = null;
       try {
         status = Status.parseFrom(entry.getValue().get());
       } catch (InvalidProtocolBufferException e) {
@@ -158,37 +162,51 @@ public class RemoveCompleteReplicationRecords implements Runnable {
         return 0l;
       }
 
-      if (status.hasClosedTime()) {
-        if (closedTime == -1) {
-          closedTime = status.getClosedTime();
-        } else if (closedTime != status.getClosedTime()) {
-          log.warn("Inconsistent closed time for {}, values seen: {} and {}", row, closedTime,
status.getClosedTime());
-        }
-      }
-
       Key k = entry.getKey();
       k.getColumnFamily(colf);
       k.getColumnQualifier(colq);
 
       m.putDelete(colf, colq);
 
+      String tableId;
+      if (StatusSection.NAME.equals(colf)) {
+        tableId = colq.toString();
+      } else if (WorkSection.NAME.equals(colf)) {
+        ReplicationTarget target = ReplicationTarget.from(colq);
+        tableId = target.getSourceTableId();
+      } else {
+        throw new RuntimeException("Got unexpected column");
+      }
+
+      if (status.hasClosedTime()) {
+        Long timeClosed = tableToTimeClosed.get(tableId);
+        if (null == timeClosed) {
+          tableToTimeClosed.put(tableId, status.getClosedTime());
+        } else if (timeClosed != status.getClosedTime()){
+          log.warn("Found multiple values for timeClosed for {}: {} and {}", row, timeClosed,
status.getClosedTime());
+        }
+      }
+
       recordsRemoved++;
     }
 
     log.info("Removing {} from the replication table", row);
 
-    ReplicationTarget target = ReplicationTarget.from(colq);
-
-    Mutation orderMutation = OrderSection.createMutation(row.toString(), status.getClosedTime());
-    log.info("Deleting {} from order section with tableID {}", new Key(new Text(orderMutation.getRow())).toStringNoTruncate(),
target.getSourceTableId());
-    orderMutation.putDelete(OrderSection.NAME, new Text(target.getSourceTableId()));
+    List<Mutation> mutations = new ArrayList<>();
+    mutations.add(m);
+    for (Entry<String,Long> entry : tableToTimeClosed.entrySet()) {
+      log.info("Removing order mutation for table {} at {} for {}", entry.getKey(), entry.getValue(),
row.toString());
+      Mutation orderMutation = OrderSection.createMutation(row.toString(), entry.getValue());
+      orderMutation.putDelete(OrderSection.NAME, new Text(entry.getKey()));
+      mutations.add(orderMutation);
+    }
 
     // Send the mutation deleting all the columns at once.
     // If we send them not as a single Mutation, we run the risk of having some of them be
applied
     // which would mean that we might accidentally re-replicate data. We want to get rid
of them all at once
     // or not at all.
     try {
-      bw.addMutations(Arrays.asList(m, orderMutation));
+      bw.addMutations(mutations);
       bw.flush();
     } catch (MutationsRejectedException e) {
       log.error("Could not submit mutation to remove columns for {} in replication table",
row, e);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d5c863de/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
index 67b652b..a2212a3 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
@@ -112,6 +112,13 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
     // Get the maximum number of entries we want to queue work for (or the default)
     this.maxQueueSize = conf.getCount(Property.REPLICATION_MAX_WORK_QUEUE);
 
+    for (Entry<String,Map<String,String>> peer : this.queuedWorkByPeerName.entrySet())
{
+      log.info("In progress replications for {}", peer.getKey());
+      for (Entry<String,String> tableRepl : peer.getValue().entrySet()) {
+        log.info("Replicating {} for table ID {}", tableRepl.getValue(), tableRepl.getKey());
+      }
+    }
+
     // Scan over the work records, adding the work to the queue
     createWork();
 
@@ -221,7 +228,6 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
     try {
       s = ReplicationTable.getScanner(conn);
     } catch (TableNotFoundException e) {
-      UtilWaitThread.sleep(1000);
       return;
     }
 
@@ -233,7 +239,6 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
       // to add more work entries
       if (queuedWorkByPeerName.size() > maxQueueSize) {
         log.warn("Queued replication work exceeds configured maximum ({}), sleeping to allow
work to occur", maxQueueSize);
-        UtilWaitThread.sleep(5000);
         return;
       }
 
@@ -241,6 +246,8 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
       OrderSection.getTableId(orderEntry.getKey(), buffer);
       String sourceTableId = buffer.toString();
 
+      log.info("Determining if {} from {} needs to be replicated", file, sourceTableId);
+
       Scanner workScanner;
       try {
         workScanner = ReplicationTable.getScanner(conn);
@@ -274,23 +281,28 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
           queuedWorkByPeerName.put(target.getPeerName(), queuedWorkForPeer);
         }
 
-        // If there is work to do
-        if (isWorkRequired(status)) {
-          Path p = new Path(file);
-          String filename = p.getName();
-          String key = getQueueKey(filename, target);
+        Path p = new Path(file);
+        String filename = p.getName();
+        String key = getQueueKey(filename, target);
 
-          // Get the file (if any) currently being replicated to the given peer for the given
source table
-          String fileBeingReplicated = queuedWorkForPeer.get(sourceTableId);
+        // Get the file (if any) currently being replicated to the given peer for the given
source table
+        String keyBeingReplicated = queuedWorkForPeer.get(sourceTableId);
 
-          if (null == fileBeingReplicated) {
+        // If there is work to do
+        if (isWorkRequired(status)) {
+          if (null == keyBeingReplicated) {
             // If there is no file, submit this one for replication
             newReplicationTasksSubmitted += queueWork(key, file, sourceTableId, queuedWorkForPeer);
           } else {
-            log.debug("Not queueing {} for work as {} must be replicated to {} first", file,
fileBeingReplicated, target.getPeerName());
+            log.debug("Not queueing {} for work as {} must be replicated to {} first", file,
keyBeingReplicated, target.getPeerName());
           }
         } else {
           log.debug("Not queueing work for {} because [{}] doesn't need replication", file,
ProtobufUtil.toString(status));
+          if (key.equals(keyBeingReplicated)) {
+            log.debug("Removing {} from replication state to {} because replication is complete",
keyBeingReplicated, target.getPeerName());
+            queuedWorkForPeer.remove(sourceTableId);
+            log.debug("State after removing element: {}", this.queuedWorkByPeerName);
+          }
         }
       }
 
@@ -327,6 +339,7 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
     final Iterator<Entry<String,Map<String,String>>> queuedWork = queuedWorkByPeerName.entrySet().iterator();
     final String instanceId = conn.getInstance().getInstanceID();
 
+    int elementsRemoved = 0;
     // Check the status of all the work we've queued up
     while (queuedWork.hasNext()) {
       // {peer -> {tableId -> workKey, tableId -> workKey, ... }, peer -> ...}
@@ -343,9 +356,13 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
         Entry<String,String> entry = iter.next();
         // Null equates to the work for this target was finished
         if (null == zooCache.get(ZooUtil.getRoot(instanceId) + Constants.ZREPLICATION_WORK_QUEUE
+ "/" + entry.getValue())) {
+          log.debug("Removing {} from work assignment state", entry.getValue());
           iter.remove();
+          elementsRemoved++;
         }
       }
     }
+
+    log.info("Removed {} elements from internal workqueue state because the work was complete",
elementsRemoved);
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d5c863de/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
index 0de7cc3..6bc5962 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
@@ -197,6 +197,8 @@ public class StatusMaker {
         log.warn("Status record ({}) for {} in table {} was written to metadata table which
was closed but lacked closedTime", ProtobufUtil.toString(stat), file, tableId);
       }
 
+      log.info("Creating order record for {} for {} with {}", file, tableId, ProtobufUtil.toString(stat));
+
       Mutation m = OrderSection.createMutation(file.toString(), stat.getClosedTime());
       OrderSection.add(m, tableId, value);
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d5c863de/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
index 2dfddc2..856153d 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
@@ -125,6 +125,8 @@ public class WorkMaker {
           } finally {
             workSpan.stop();
           }
+        } else {
+          log.warn("No configured targets for table with ID {}", tableId);
         }
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d5c863de/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
index 855d9ef..418f679 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
@@ -891,7 +891,7 @@ public class Tablet {
               tabletServer.getClientAddressString(), tabletServer.getLock(), unusedWalLogs,
lastLocation, flushId);
 
           // Mark that we have data we want to replicate
-          // This WAL could still be in use by other Tablets though, so we can mark that
there is data to replicate,
+          // This WAL could still be in use by other Tablets *from the same table*, so we
can only mark that there is data to replicate,
           // but it is *not* closed
           if (replicate) {
             ReplicationTableUtil.updateFiles(SystemCredentials.get(), extent, logFileOnly,
StatusUtil.openWithUnknownLength());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d5c863de/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index 2b8496c..481b3e8 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@ -118,6 +118,8 @@ public class ReplicationProcessor implements Processor {
       throw new RuntimeException(e);
     }
 
+    log.debug("Replicating {} to {} using {}", filePath, target, replica.getClass().getName());
+
     // Replicate that sucker
     Status replicatedStatus = replica.replicate(filePath, status, target);
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d5c863de/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index a8b6bbc..91f3581 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -18,6 +18,7 @@ package org.apache.accumulo.test.replication;
 
 import java.util.Iterator;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
@@ -66,90 +67,6 @@ public class ReplicationIT extends ConfigurableMacIT {
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
   }
 
-  @Test
-  public void dataIsReplicatedAfterCompaction() throws Exception {
-
-    MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName()
+ "_" + this.testName.getMethodName() + "_peer"),
-        ROOT_PASSWORD);
-    peerCfg.setNumTservers(1);
-    peerCfg.setInstanceName("peer");
-    peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
-    peerCfg.setProperty(Property.MASTER_REPLICATION_COORDINATOR_PORT, "10003");
-    peerCfg.setProperty(Property.REPLICATION_RECEIPT_SERVICE_PORT, "10004");
-    peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
-    peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
-    peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
-    MiniAccumuloClusterImpl peerCluster = peerCfg.build();
-
-    peerCluster.start();
-
-    Connector connMaster = getConnector();
-    Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD);
-
-    String peerClusterName = "peer";
-
-    // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
-    connMaster.instanceOperations().setProperty(
-        Property.REPLICATION_PEERS.getKey() + peerClusterName,
-        ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
-            AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers())));
-
-    String masterTable = "master", peerTable = "peer";
-
-    connMaster.tableOperations().create(masterTable);
-    String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable);
-    Assert.assertNotNull(masterTableId);
-
-    connPeer.tableOperations().create(peerTable);
-    String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable);
-    Assert.assertNotNull(peerTableId);
-
-    // Replicate this table to the peerClusterName in a table with the peerTableId table
id
-    connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(),
"true");
-    connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGETS.getKey()
+ peerClusterName, peerTableId);
-
-    // Write some data to table1
-    BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig());
-    for (int rows = 0; rows < 5000; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 100; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
-
-    log.info("Wrote all data to master cluster");
-
-    connMaster.tableOperations().compact(masterTable, null, null, true, true);
-
-    Thread.sleep(5000);
-
-    for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY))
{
-      log.debug(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
-    }
-
-    connMaster.replicationOperations().drain(masterTable);
-
-    try {
-      Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer
= connPeer.createScanner(peerTable, Authorizations.EMPTY);
-      Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator();
-      while (masterIter.hasNext() && peerIter.hasNext()) {
-        Entry<Key,Value> masterEntry = masterIter.next(), peerEntry = peerIter.next();
-        Assert.assertEquals(peerEntry.getKey() + " was not equal to " + peerEntry.getKey(),
0,
-            masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS));
-        Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue());
-      }
-  
-      Assert.assertFalse("Had more data to read from the master", masterIter.hasNext());
-      Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext());
-    } finally {
-      peerCluster.stop();
-    }
-  }
-
   @Test(timeout = 60 * 5000)
   public void dataWasReplicatedToThePeer() throws Exception {
     MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName()
+ "_" + this.testName.getMethodName() + "_peer"),
@@ -206,6 +123,8 @@ public class ReplicationIT extends ConfigurableMacIT {
 
     log.info("Wrote all data to master cluster");
 
+    Set<String> files = connMaster.replicationOperations().referencedFiles(masterTable);
+
     for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
       cluster.killProcess(ServerType.TABLET_SERVER, proc);
     }
@@ -219,7 +138,7 @@ public class ReplicationIT extends ConfigurableMacIT {
       log.debug(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
     }
 
-    connMaster.replicationOperations().drain(masterTable);
+    connMaster.replicationOperations().drain(masterTable, files);
 
     Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer =
connPeer.createScanner(peerTable, Authorizations.EMPTY);
     Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator();
@@ -321,8 +240,14 @@ public class ReplicationIT extends ConfigurableMacIT {
         Thread.sleep(500);
       }
 
-      connMaster.tableOperations().compact(masterTable1, null, null, true, false);
-      connMaster.tableOperations().compact(masterTable2, null, null, true, false);
+
+      for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER))
{
+        cluster.killProcess(ServerType.TABLET_SERVER, proc);
+      }
+
+      cluster.exec(TabletServer.class);
+      // connMaster.tableOperations().compact(masterTable1, null, null, true, false);
+      // connMaster.tableOperations().compact(masterTable2, null, null, true, false);
 
       // Wait until we fully replicated something
       boolean fullyReplicated = false;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d5c863de/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
index f1d25ae..ea7aaa2 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
@@ -39,11 +39,8 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
-import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.master.replication.SequentialWorkAssigner;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
@@ -66,7 +63,7 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
   private static final Logger log = LoggerFactory.getLogger(ReplicationSequentialIT.class);
 
   private ExecutorService executor;
-  
+
   @Before
   public void setup() {
     executor = Executors.newSingleThreadExecutor();
@@ -87,6 +84,8 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
     cfg.setProperty(Property.GC_CYCLE_DELAY, "5s");
     cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
     cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
+    cfg.setProperty(Property.MASTER_REPLICATION_COORDINATOR_PORT, "0");
+    cfg.setProperty(Property.REPLICATION_RECEIPT_SERVICE_PORT, "0");
     cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M");
     cfg.setProperty(Property.REPLICATION_NAME, "master");
     cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
@@ -100,8 +99,8 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
     peerCfg.setNumTservers(1);
     peerCfg.setInstanceName("peer");
     peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
-    peerCfg.setProperty(Property.MASTER_REPLICATION_COORDINATOR_PORT, "10003");
-    peerCfg.setProperty(Property.REPLICATION_RECEIPT_SERVICE_PORT, "10004");
+    peerCfg.setProperty(Property.MASTER_REPLICATION_COORDINATOR_PORT, "0");
+    peerCfg.setProperty(Property.REPLICATION_RECEIPT_SERVICE_PORT, "0");
     peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
     peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
     peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
@@ -169,7 +168,8 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
     cluster.exec(TabletServer.class);
 
     log.info("TabletServer restarted");
-    for (@SuppressWarnings("unused") Entry<Key,Value> e : ReplicationTable.getScanner(connMaster))
{}
+    for (@SuppressWarnings("unused")
+    Entry<Key,Value> e : ReplicationTable.getScanner(connMaster)) {}
     log.info("TabletServer is online");
 
     log.info("");
@@ -196,7 +196,7 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
         log.info("Drain completed");
         return true;
       }
-      
+
     });
 
     try {
@@ -237,7 +237,7 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
 
     log.info("Last master entry: " + masterEntry);
     log.info("Last peer entry: " + peerEntry);
-    
+
     Assert.assertFalse("Had more data to read from the master", masterIter.hasNext());
     Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext());
 
@@ -251,8 +251,8 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
     peerCfg.setNumTservers(1);
     peerCfg.setInstanceName("peer");
     peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
-    peerCfg.setProperty(Property.MASTER_REPLICATION_COORDINATOR_PORT, "10003");
-    peerCfg.setProperty(Property.REPLICATION_RECEIPT_SERVICE_PORT, "10004");
+    peerCfg.setProperty(Property.MASTER_REPLICATION_COORDINATOR_PORT, "0");
+    peerCfg.setProperty(Property.REPLICATION_RECEIPT_SERVICE_PORT, "0");
     peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
     peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
     peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
@@ -300,7 +300,7 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
 
       // Write some data to table1
       BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig());
-      long masterTable1Records = 0l; 
+      long masterTable1Records = 0l;
       for (int rows = 0; rows < 2500; rows++) {
         Mutation m = new Mutation(masterTable1 + rows);
         for (int cols = 0; cols < 100; cols++) {
@@ -330,13 +330,13 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
 
       log.info("Wrote all data to master cluster");
 
+      Set<String> filesFor1 = connMaster.replicationOperations().referencedFiles(masterTable1),
filesFor2 = connMaster.replicationOperations().referencedFiles(
+          masterTable2);
+
       while (!connMaster.tableOperations().exists(ReplicationTable.NAME)) {
         Thread.sleep(500);
       }
 
-      Set<String> filesFor1 = connMaster.replicationOperations().referencedFiles(masterTable1),
filesFor2 = connMaster.replicationOperations().referencedFiles(
-          masterTable2);
-
       // Restart the tserver to force a close on the WAL
       for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER))
{
         cluster.killProcess(ServerType.TABLET_SERVER, proc);
@@ -346,7 +346,8 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
       log.info("Restarted the tserver");
 
       // Read the data -- the tserver is back up and running
-      for (@SuppressWarnings("unused") Entry<Key,Value> entry : connMaster.createScanner(masterTable1,
Authorizations.EMPTY)) {}
+      for (@SuppressWarnings("unused")
+      Entry<Key,Value> entry : connMaster.createScanner(masterTable1, Authorizations.EMPTY))
{}
 
       // Wait for both tables to be replicated
       log.info("Waiting for {} for {}", filesFor1, masterTable1);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d5c863de/test/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/test/src/test/resources/log4j.properties b/test/src/test/resources/log4j.properties
index 11ff405..407bc28 100644
--- a/test/src/test/resources/log4j.properties
+++ b/test/src/test/resources/log4j.properties
@@ -21,7 +21,7 @@ log4j.appender.CA.layout.ConversionPattern=%d{ISO8601} [%c{2}] %-5p: %m%n
 log4j.logger.org.apache.accumulo.core=DEBUG
 log4j.logger.org.apache.accumulo.core.client.impl.MasterClient=INFO
 log4j.logger.org.apache.accumulo.core.client.impl.ServerClient=ERROR
-log4j.logger.org.apache.accumulo.core.util.shell.Shell.audit=off
+log4j.logger.org.apache.accumulo.core.util.shell.Shell.audit=OFF
 log4j.logger.org.apache.accumulo.core.util.shell.Shell=FATAL
 log4j.logger.org.apache.commons.vfs2.impl.DefaultFileSystemManager=WARN
 log4j.logger.org.apache.hadoop.io.compress.CodecPool=WARN


Mime
View raw message