accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [48/50] [abbrv] git commit: ACCUMULO-2583 Advertise peer master coordinator service port in ZK.
Date Fri, 09 May 2014 15:29:26 GMT
ACCUMULO-2583 Advertise peer master coordinator service port in ZK.

Need to use the proper contact info for the peer master. Some more configuration
Properties for tweaking things. Better logging.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e84879c8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e84879c8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e84879c8

Branch: refs/heads/ACCUMULO-378
Commit: e84879c8dd814e1f00c9b109f095a45015224c7f
Parents: 53fc90f
Author: Josh Elser <elserj@apache.org>
Authored: Thu May 8 21:56:45 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Thu May 8 21:56:45 2014 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/Constants.java     |  1 +
 .../core/client/impl/ReplicationClient.java     | 39 +++++++++++++++++---
 .../org/apache/accumulo/core/conf/Property.java | 10 ++++-
 .../java/org/apache/accumulo/master/Master.java | 16 +++++++-
 .../replication/ReplicationWorkAssigner.java    |  7 +++-
 .../apache/accumulo/tserver/TabletServer.java   |  2 +-
 .../replication/AccumuloReplicaSystem.java      | 20 +++++++---
 .../test/replication/ReplicationIT.java         | 34 +++++++++++------
 8 files changed, 100 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/core/src/main/java/org/apache/accumulo/core/Constants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java
index c87119c..f230690 100644
--- a/core/src/main/java/org/apache/accumulo/core/Constants.java
+++ b/core/src/main/java/org/apache/accumulo/core/Constants.java
@@ -43,6 +43,7 @@ public class Constants {
   public static final String ZMASTERS = "/masters";
   public static final String ZMASTER_LOCK = ZMASTERS + "/lock";
   public static final String ZMASTER_GOAL_STATE = ZMASTERS + "/goal_state";
+  public static final String ZMASTER_REPLICATION_COORDINATOR_PORT = ZMASTERS + "/repl_coord_port";
 
   public static final String ZGC = "/gc";
   public static final String ZGC_LOCK = ZGC + "/lock";

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
index df12ae8..65565f8 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
@@ -20,22 +20,29 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
-import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
 import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
 import org.apache.thrift.TServiceClient;
 import org.apache.thrift.transport.TTransportException;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.net.HostAndPort;
+
 public class ReplicationClient {
   private static final Logger log = LoggerFactory.getLogger(ReplicationClient.class);
 
@@ -65,21 +72,41 @@ public class ReplicationClient {
       return null;
     }
 
-    String master = locations.get(0);
-    if (master.endsWith(":0"))
+    // This is the master thrift service, we just want the hostname, not the port
+    String masterThriftService = locations.get(0);
+    if (masterThriftService.endsWith(":0"))
+      return null;
+
+    
+    AccumuloConfiguration conf = ServerConfigurationUtil.getConfiguration(instance);
+
+    HostAndPort masterAddr = HostAndPort.fromString(masterThriftService);
+    String zkPath = ZooUtil.getRoot(instance) + Constants.ZMASTER_REPLICATION_COORDINATOR_PORT;
+    String replCoordinatorPort;
+
+    // Get the coordinator port for the master we're trying to connect to
+    try {
+      ZooReader reader = new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
+      replCoordinatorPort = new String(reader.getData(zkPath, null), StandardCharsets.UTF_8);
+    } catch (KeeperException | InterruptedException e) {
+      log.debug("Could not fetch remote coordinator port");
       return null;
+    }
+
+    // Throw the hostname and port through HostAndPort to get some normalization
+    HostAndPort coordinatorAddr = HostAndPort.fromParts(masterAddr.getHostText(), Integer.parseInt(replCoordinatorPort));
 
     try {
       // Master requests can take a long time: don't ever time out
-      ReplicationCoordinator.Client client = ThriftUtil.getClientNoTimeout(new ReplicationCoordinator.Client.Factory(),
master,
-          ServerConfigurationUtil.getConfiguration(instance));
+      ReplicationCoordinator.Client client = ThriftUtil.getClientNoTimeout(new ReplicationCoordinator.Client.Factory(),
coordinatorAddr.toString(),
+          conf);
       return client;
     } catch (TTransportException tte) {
       if (tte.getCause().getClass().equals(UnknownHostException.class)) {
         // do not expect to recover from this
         throw new RuntimeException(tte);
       }
-      log.debug("Failed to connect to master={}, will retry... ", master, tte);
+      log.debug("Failed to connect to master coordinator service ({}), will retry... ", coordinatorAddr.toString(),
tte);
       return null;
     }
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index d611bd5..a87b1b4 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -182,6 +182,12 @@ public enum Property {
   @Experimental
   MASTER_REPLICATION_SCAN_INTERVAL("master.replication.status.scan.interval", "30s", PropertyType.TIMEDURATION,
       "Amount of time to sleep before scanning the status section of the replication table
for new data"),
+  @Experimental
+  MASTER_REPLICATION_COORDINATOR_PORT("master.replication.coordinator.port", "10001", PropertyType.PORT,
"Port for the replication coordinator service"),
+  @Experimental
+  MASTER_REPLICATION_COORDINATOR_MINTHREADS("master.replication.coordinator.minthreads",
"4", PropertyType.COUNT, "Minimum number of threads dedicated to answering coordinator requests"),
+  @Experimental
+  MASTER_REPLICATION_COORDINATOR_THREADCHECK("master.replication.coordinator.threadcheck.time",
"5s", PropertyType.TIMEDURATION, "The time between adjustments of the coordinator thread pool"),
 
   // properties that are specific to tablet server behavior
   TSERV_PREFIX("tserver.", null, PropertyType.PREFIX, "Properties in this category affect
the behavior of the tablet servers"),
@@ -457,13 +463,15 @@ public enum Property {
   @Experimental
   REPLICATION_WORKER_THREADS("replication.worker.threads", "4", PropertyType.COUNT, "Size
of the threadpool that each tabletserver devotes to replicating data"),
   @Experimental
-  REPLICATION_RECEIPT_SERVICE_PORT("replication.receipt.service.port", "10001", PropertyType.PORT,
"Listen port used by thrift service in tserver listening for replication"),
+  REPLICATION_RECEIPT_SERVICE_PORT("replication.receipt.service.port", "10002", PropertyType.PORT,
"Listen port used by thrift service in tserver listening for replication"),
   @Experimental
   REPLICATION_WORK_ATTEMPTS("replication.work.attempts", "10", PropertyType.COUNT, "Number
of attempts to try to replicate some data before giving up and letting it naturally be retried
later"),
   @Experimental
   REPLICATION_MIN_THREADS("replication.receiver.min.threads", "1", PropertyType.COUNT, "Minimum
number of threads for replciation"),
   @Experimental
   REPLICATION_THREADCHECK("replication.receiver.threadcheck.time", "5s", PropertyType.TIMEDURATION,
"The time between adjustments of the replication thread pool."),
+  @Experimental
+  REPLICATION_MAX_UNIT_SIZE("replication.max.unit.size", "64M", PropertyType.MEMORY, "Maximum
size of data to send in a replication message"),
 
   ;
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 0eac9ab..b0745eb 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -60,6 +60,7 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.security.NamespacePermission;
@@ -76,6 +77,7 @@ import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.master.recovery.RecoveryManager;
+import org.apache.accumulo.master.replication.MasterReplicationCoordinator;
 import org.apache.accumulo.master.replication.ReplicationDriver;
 import org.apache.accumulo.master.replication.ReplicationWorkAssigner;
 import org.apache.accumulo.master.state.TableCounts;
@@ -977,8 +979,6 @@ public class Master implements LiveTServerSet.Listener, TableObserver,
CurrentSt
       throw new IOException(e);
     }
 
-    
-
     Processor<Iface> processor = new Processor<Iface>(TraceWrap.service(new MasterClientServiceHandler(this)));
     ServerAddress sa = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_CLIENTPORT,
processor, "Master",
         "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK,
Property.GENERAL_MAX_MESSAGE_SIZE);
@@ -990,6 +990,17 @@ public class Master implements LiveTServerSet.Listener, TableObserver,
CurrentSt
     while (!clientService.isServing()) {
       UtilWaitThread.sleep(100);
     }
+
+    // Start the replication coordinator which assigns tservers to service replication requests
+    ReplicationCoordinator.Processor<ReplicationCoordinator.Iface> replicationCoordinatorProcessor
= new ReplicationCoordinator.Processor<ReplicationCoordinator.Iface>(
+        TraceWrap.service(new MasterReplicationCoordinator(this, getSystemConfiguration())));
+    ServerAddress replAddress = TServerUtils.startServer(getSystemConfiguration(), hostname,
Property.MASTER_REPLICATION_COORDINATOR_PORT, replicationCoordinatorProcessor, "Master Replication
Coordinator",
+        "Replication Coordinator", null, Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS,
Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
+
+    // Advertise that port we used so peers don't have to be told what it is
+    ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMASTER_REPLICATION_COORDINATOR_PORT,
+        Integer.toString(replAddress.address.getPort()).getBytes(StandardCharsets.UTF_8),
NodeExistsPolicy.OVERWRITE);
+    
     while (clientService.isServing()) {
       UtilWaitThread.sleep(500);
     }
@@ -1000,6 +1011,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver,
CurrentSt
     statusThread.join(remaining(deadline));
     replicationWorkAssigner.join(remaining(deadline));
     replicationWorkDriver.join(remaining(deadline));
+    replAddress.server.stop();
 
     // quit, even if the tablet servers somehow jam up and the watchers
     // don't stop

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
index 24842a9..1dd20da 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
@@ -152,6 +152,8 @@ public class ReplicationWorkAssigner extends Daemon {
 
   @Override
   public void run() {
+    log.info("Starting replication work assignment thread");
+
     while (master.stillMaster()) {
       if (null == conf) {
         conf = master.getConfiguration().getConfiguration();
@@ -178,7 +180,9 @@ public class ReplicationWorkAssigner extends Daemon {
       // Keep the state of the work we queued correct
       cleanupFinishedWork();
 
-      UtilWaitThread.sleep(conf.getTimeInMillis(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP));
+      long sleepTime = conf.getTimeInMillis(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP);
+      log.debug("Sleeping {} ms", sleepTime);
+      UtilWaitThread.sleep(sleepTime);
     }
   }
 
@@ -205,6 +209,7 @@ public class ReplicationWorkAssigner extends Daemon {
         // to add more work entries
         if (queuedWork.size() > maxQueueSize) {
           log.warn("Queued replication work exceeds configured maximum ({}), sleeping to
allow work to occur", maxQueueSize);
+          UtilWaitThread.sleep(5000);
           return;
         }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 803419c..0b5c6bf 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -3122,7 +3122,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
   }
 
   private HostAndPort startReplicationService() throws UnknownHostException {
-    ReplicationServicer.Iface repl = TraceWrap.service(new ReplicationServicerHandler());
+    ReplicationServicer.Iface repl = TraceWrap.service(new ReplicationServicerHandler(this));
     ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new ReplicationServicer.Processor<ReplicationServicer.Iface>(repl);
     AccumuloConfiguration conf = getSystemConfiguration();
     Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null
? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index 40676f7..f275cdb 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -131,21 +131,29 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       Long entriesReplicated;
       //TODO should chunk up the given file into some configurable sizes instead of just
sending the entire file all at once
       //     configuration should probably just be size based.
-      final long sizeLimit = Long.MAX_VALUE;
+      final long sizeLimit = conf.getMemoryInBytes(Property.REPLICATION_MAX_UNIT_SIZE);
       try {
         entriesReplicated = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver,
new ClientExecReturn<Long,ReplicationServicer.Client>() {
           @Override
           public Long execute(Client client) throws Exception {
             // RFiles have an extension, call everything else a WAL
             if (p.getName().endsWith(RFILE_SUFFIX)) {
-              return client.replicateKeyValues(remoteTableId, getKeyValues(p, status, sizeLimit));
+              KeyValues kvs = getKeyValues(p, status, sizeLimit);
+              if (0 < kvs.getKeyValuesSize()) {
+                return client.replicateKeyValues(remoteTableId, kvs);
+              }
             } else {
-              return client.replicateLog(remoteTableId, getWalEdits(p, status, sizeLimit));
+              WalEdits edits = getWalEdits(p, status, sizeLimit);
+              if (0 < edits.getEditsSize()) {
+                return client.replicateLog(remoteTableId, edits);
+              }
             }
+
+            return 0l;
           }
         });
 
-        log.debug("Replicated {} entries from {} to {} which is a part of {}", entriesReplicated,
p, peerTserver, peerInstance.getInstanceName());
+        log.debug("Replicated {} entries from {} to {} which is a member of the peer '{}'",
entriesReplicated, p, peerTserver, peerInstance.getInstanceName());
 
         // Update the begin to account for what we replicated
         Status updatedStatus = Status.newBuilder(status).setBegin(status.getBegin() + entriesReplicated).build();
@@ -195,7 +203,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
         key.readFields(wal);
         value.readFields(wal);
       } catch (EOFException e) {
-        log.trace("Caught EOFException, no more data to replicate");
+        log.debug("Caught EOFException, no more data to replicate");
         break;
       }
 
@@ -217,7 +225,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       }
     }
 
-    log.debug("Returning {} bytes of WAL entries for replication for {}", size, p);
+    log.debug("Binned {} bytes of WAL entries for replication to peer '{}'", size, p);
 
     return edits;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 0d4099c..0ad8066 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -33,9 +33,12 @@ import org.apache.accumulo.server.replication.ReplicationTable;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.protobuf.TextFormat;
 
@@ -43,13 +46,16 @@ import com.google.protobuf.TextFormat;
  * 
  */
 public class ReplicationIT extends ConfigurableMacIT {
+  private static final Logger log = LoggerFactory.getLogger(ReplicationIT.class);
 
   @Override
   public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
     cfg.setNumTservers(1);
-    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
+    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "32M");
     cfg.setProperty(Property.GC_CYCLE_START, "1s");
-    cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
+    cfg.setProperty(Property.GC_CYCLE_DELAY, "5s");
+    cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "5s");
+    cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M");
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
   }
 
@@ -59,7 +65,8 @@ public class ReplicationIT extends ConfigurableMacIT {
         ROOT_PASSWORD);
     peerCfg.setNumTservers(1);
     peerCfg.setInstanceName("peer");
-    peerCfg.setProperty(Property.REPLICATION_RECEIPT_SERVICE_PORT, "10002");
+    peerCfg.setProperty(Property.MASTER_REPLICATION_COORDINATOR_PORT, "10003");
+    peerCfg.setProperty(Property.REPLICATION_RECEIPT_SERVICE_PORT, "10004");
     MiniAccumuloClusterImpl peerCluster = peerCfg.build();
 
     peerCluster.start();
@@ -89,13 +96,15 @@ public class ReplicationIT extends ConfigurableMacIT {
 
     // Write some data to table1
     BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig());
-    for (int rows = 0; rows < 250; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 500; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
+    for (int i = 0; i < 100; i++) {
+      for (int rows = 0; rows < 1000; rows++) {
+        Mutation m = new Mutation(i + "_" + Integer.toString(rows));
+        for (int cols = 0; cols < 400; cols++) {
+          String value = Integer.toString(cols);
+          m.put(value, "", value);
+        }
+        bw.addMutation(m);
       }
-      bw.addMutation(m);
     }
 
     bw.close();
@@ -104,17 +113,18 @@ public class ReplicationIT extends ConfigurableMacIT {
       Thread.sleep(500);
     }
 
+    connMaster.tableOperations().compact(masterTable, null, null, true, false);
     for (int i = 0; i < 10; i++) {
-      
       Scanner s = ReplicationTable.getScanner(connMaster);
       for (Entry<Key,Value> e : s) {
-        log.info(e.getKey().toStringNoTruncate() + " " + TextFormat.shortDebugString(Status.parseFrom(e.getValue().get())));
+        Path p = new Path(e.getKey().getRow().toString());
+        log.info(p.getName() + " " + e.getKey().getColumnFamily() + " " + e.getKey().getColumnQualifier()
+ " " + TextFormat.shortDebugString(Status.parseFrom(e.getValue().get())));
       }
 
       log.info("");
       log.info("");
 
-      Thread.sleep(1000);
+      Thread.sleep(3000);
     }
 
     peerCluster.stop();


Mime
View raw message