accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [25/35] git commit: ACCUMULO-378 Core review fixes from bhavanki for replication
Date Thu, 05 Jun 2014 04:43:07 GMT
ACCUMULO-378 Core review fixes from bhavanki for replication


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/84e94a42
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/84e94a42
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/84e94a42

Branch: refs/heads/ACCUMULO-378
Commit: 84e94a429bd92e469156642b1bfd69c422759e2d
Parents: 2f02d69
Author: Josh Elser <elserj@apache.org>
Authored: Wed Jun 4 13:52:58 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Wed Jun 4 13:52:58 2014 -0400

----------------------------------------------------------------------
 .../client/admin/ReplicationOperations.java     | 12 ++---
 .../core/client/impl/ReplicationClient.java     | 34 +++++++-----
 .../client/impl/ReplicationOperationsImpl.java  | 52 ++++++++----------
 .../replication/PeerNotFoundException.java      |  4 ++
 .../core/client/replication/ReplicaSystem.java  |  3 +-
 .../replication/ReplicaSystemFactory.java       |  6 ++-
 .../org/apache/accumulo/core/data/Mutation.java | 13 +++--
 .../master/replication/ReplicationDriver.java   | 13 +++--
 .../test/replication/CyclicReplicationIT.java   | 43 +++++++--------
 .../UnorderedWorkAssignerReplicationIT.java     | 57 ++++++++++----------
 10 files changed, 128 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
b/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
index 1d20f79..5873f73 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
@@ -32,14 +32,14 @@ import org.apache.accumulo.core.client.replication.ReplicaSystem;
 public interface ReplicationOperations {
 
   /**
-   * Define a cluster with the given name using the given {@link ReplicaSystem}
+   * Defines a cluster with the given name using the given {@link ReplicaSystem}.
    * @param name Name of the cluster, used for configuring replication on tables
    * @param system Type of system to be replicated to
    */
   public void addPeer(String name, ReplicaSystem system) throws AccumuloException, AccumuloSecurityException,
PeerExistsException;
 
   /**
-   * Define a cluster with the given name and the given name system
+   * Defines a cluster with the given name and the given name system.
    * @param name Unique name for the cluster
    * @param replicaType {@link ReplicaSystem} class name to use to replicate the data
    * @throws PeerExistsException
@@ -47,14 +47,14 @@ public interface ReplicationOperations {
   public void addPeer(String name, String replicaType) throws AccumuloException, AccumuloSecurityException,
PeerExistsException;
 
   /**
-   * Remove a cluster with the given name
+   * Removes a cluster with the given name.
    * @param name Name of the cluster to remove
    * @throws PeerNotFoundException
    */
   public void removePeer(String name) throws AccumuloException, AccumuloSecurityException,
PeerNotFoundException;
 
   /**
-   * Wait for a table to be fully replicated
+   * Waits for a table to be fully replicated.
    * @param tableName The table to wait for
    * @throws AccumuloException
    * @throws AccumuloSecurityException
@@ -62,7 +62,7 @@ public interface ReplicationOperations {
   public void drain(String tableName) throws AccumuloException, AccumuloSecurityException,
TableNotFoundException;
 
   /**
-   * Wait for a table to be fully replicated as determined by the provided tables
+   * Waits for a table to be fully replicated as determined by the provided tables.
    * @param tableName The table to wait for
    * @throws AccumuloException
    * @throws AccumuloSecurityException
@@ -70,7 +70,7 @@ public interface ReplicationOperations {
   public void drain(String tableName, Set<String> files) throws AccumuloException,
AccumuloSecurityException, TableNotFoundException;
 
   /**
-   * Get all of the referenced files for a table
+   * Gets all of the referenced files for a table.
    * @param tableName
    * @throws TableNotFoundException
    */

http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
index d7b12c7..13c027a 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
@@ -16,7 +16,6 @@
  */
 package org.apache.accumulo.core.client.impl;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.net.UnknownHostException;
@@ -52,14 +51,19 @@ public class ReplicationClient {
    * @return Client to the ReplicationCoordinator service
    */
   public static ReplicationCoordinator.Client getCoordinatorConnectionWithRetry(Instance
instance) throws AccumuloException {
-    checkArgument(instance != null, "instance is null");
+    checkNotNull(instance);
 
     for (int attempts = 1; attempts <= 10; attempts++) {
 
       ReplicationCoordinator.Client result = getCoordinatorConnection(instance);
       if (result != null)
         return result;
-      UtilWaitThread.sleep(attempts * 250);
+      log.debug("Could not get ReplicationCoordinator connection to {}, will retry", instance.getInstanceName());
+      try {
+        Thread.sleep(attempts * 250);
+      } catch (InterruptedException e) {
+        throw new AccumuloException(e);
+      }
     }
 
     throw new AccumuloException("Timed out trying to communicate with master from " + instance.getInstanceName());
@@ -69,14 +73,16 @@ public class ReplicationClient {
     List<String> locations = instance.getMasterLocations();
 
     if (locations.size() == 0) {
-      log.debug("No masters...");
+      log.debug("No masters for replication to instance {}", instance.getInstanceName());
       return null;
     }
 
     // This is the master thrift service, we just want the hostname, not the port
     String masterThriftService = locations.get(0);
-    if (masterThriftService.endsWith(":0"))
+    if (masterThriftService.endsWith(":0")) {
+      log.warn("Master found for {} did not have real location {}", instance.getInstanceName(),
masterThriftService);
       return null;
+    }
 
 
     AccumuloConfiguration conf = ServerConfigurationUtil.getConfiguration(instance);
@@ -91,7 +97,7 @@ public class ReplicationClient {
       ZooReader reader = new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
       replCoordinatorAddr = new String(reader.getData(zkPath, null), StandardCharsets.UTF_8);
     } catch (KeeperException | InterruptedException e) {
-      log.error("Could not fetch remote coordinator port");
+      log.error("Could not fetch remote coordinator port", e);
       return null;
     }
 
@@ -106,11 +112,7 @@ public class ReplicationClient {
           conf);
       return client;
     } catch (TTransportException tte) {
-      if (tte.getCause().getClass().equals(UnknownHostException.class)) {
-        // do not expect to recover from this
-        throw new RuntimeException(tte);
-      }
-      log.debug("Failed to connect to master coordinator service ({}), will retry... ", coordinatorAddr.toString(),
tte);
+      log.debug("Failed to connect to master coordinator service ({})", coordinatorAddr.toString(),
tte);
       return null;
     }
   }
@@ -157,13 +159,17 @@ public class ReplicationClient {
   public static <T> T executeCoordinatorWithReturn(Instance instance, ClientExecReturn<T,ReplicationCoordinator.Client>
exec) throws AccumuloException,
       AccumuloSecurityException {
     ReplicationCoordinator.Client client = null;
-    while (true) {
+    for (int i = 0; i < 10; i++) {
       try {
         client = getCoordinatorConnectionWithRetry(instance);
         return exec.execute(client);
       } catch (TTransportException tte) {
         log.debug("ReplicationClient coordinator request failed, retrying ... ", tte);
-        UtilWaitThread.sleep(100);
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          throw new AccumuloException(e);
+        }
       } catch (ThriftSecurityException e) {
         throw new AccumuloSecurityException(e.user, e.code, e);
       } catch (AccumuloException e) {
@@ -175,6 +181,8 @@ public class ReplicationClient {
           close(client);
       }
     }
+
+    throw new AccumuloException("Could not connect to ReplicationCoordinator at " + instance.getInstanceName());
   }
 
   public static void executeCoordinator(Instance instance, ClientExec<ReplicationCoordinator.Client>
exec) throws AccumuloException, AccumuloSecurityException {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
index 4355867..51a5367 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
@@ -66,10 +66,12 @@ import com.google.protobuf.InvalidProtocolBufferException;
 public class ReplicationOperationsImpl implements ReplicationOperations {
   private static final Logger log = LoggerFactory.getLogger(ReplicationOperationsImpl.class);
 
-  private Instance inst;
-  private Credentials creds;
+  private final Instance inst;
+  private final Credentials creds;
 
   public ReplicationOperationsImpl(Instance inst, Credentials creds) {
+    checkNotNull(inst);
+    checkNotNull(creds);
     this.inst = inst;
     this.creds = creds;
   }
@@ -125,32 +127,16 @@ public class ReplicationOperationsImpl implements ReplicationOperations
{
     checkNotNull(tableName);
 
     Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
-    TableOperations tops = conn.tableOperations();
-    while (!tops.exists(ReplicationTable.NAME)) {
-      UtilWaitThread.sleep(200);
-    }
-
-    if (!conn.tableOperations().exists(tableName)) {
-      throw new TableNotFoundException(null, tableName, null);
-    }
-
-    String strTableId = null;
-    while (null == strTableId) {
-      strTableId = tops.tableIdMap().get(tableName);
-      if (null == strTableId) {
-        UtilWaitThread.sleep(200);
-      }
-    }
-
-    Text tableId = new Text(strTableId);
+    Text tableId = getTableId(conn, tableName);
 
     log.info("Waiting for {} to be replicated for {}", wals, tableId);
 
     log.info("Reading from metadata table");
     boolean allMetadataRefsReplicated = false;
+    final Set<Range> range = Collections.singleton(new Range(ReplicationSection.getRange()));
     while (!allMetadataRefsReplicated) {
       BatchScanner bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY,
4);
-      bs.setRanges(Collections.singleton(new Range(ReplicationSection.getRange())));
+      bs.setRanges(range);
       bs.fetchColumnFamily(ReplicationSection.COLF);
       try {
         allMetadataRefsReplicated = allReferencesReplicated(bs, tableId, wals);
@@ -228,13 +214,7 @@ public class ReplicationOperationsImpl implements ReplicationOperations
{
     return true;
   }
 
-  @Override
-  public Set<String> referencedFiles(String tableName) throws AccumuloException, AccumuloSecurityException,
TableNotFoundException {
-    checkNotNull(tableName);
-
-    log.debug("Collecting referenced files for replication of table {}", tableName);
-
-    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+  protected Text getTableId(Connector conn, String tableName) throws AccumuloException, AccumuloSecurityException,
TableNotFoundException {
     TableOperations tops = conn.tableOperations();
     while (!tops.exists(ReplicationTable.NAME)) {
       UtilWaitThread.sleep(200);
@@ -252,13 +232,23 @@ public class ReplicationOperationsImpl implements ReplicationOperations
{
       }
     }
 
-    Text tableId = new Text(strTableId);
+    return new Text(strTableId);    
+  }
+
+  @Override
+  public Set<String> referencedFiles(String tableName) throws AccumuloException, AccumuloSecurityException,
TableNotFoundException {
+    checkNotNull(tableName);
+
+    log.debug("Collecting referenced files for replication of table {}", tableName);
+
+    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+    Text tableId = getTableId(conn, tableName);
 
-    log.debug("Found id of {} for name {}", strTableId, tableName);
+    log.debug("Found id of {} for name {}", tableId, tableName);
 
     // Get the WALs currently referenced by the table
     BatchScanner metaBs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY,
4);
-    metaBs.setRanges(Collections.singleton(MetadataSchema.TabletsSection.getRange(strTableId)));
+    metaBs.setRanges(Collections.singleton(MetadataSchema.TabletsSection.getRange(tableId.toString())));
     metaBs.fetchColumnFamily(LogColumnFamily.NAME);
     Set<String> wals = new HashSet<>();
     try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/core/src/main/java/org/apache/accumulo/core/client/replication/PeerNotFoundException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/replication/PeerNotFoundException.java
b/core/src/main/java/org/apache/accumulo/core/client/replication/PeerNotFoundException.java
index 1859c62..4e02218 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/replication/PeerNotFoundException.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/replication/PeerNotFoundException.java
@@ -33,4 +33,8 @@ public class PeerNotFoundException extends Exception {
   public PeerNotFoundException(String message, Throwable cause) {
     super(message, cause);
   }
+
+  public PeerNotFoundException(String peer, String message, Throwable cause) {
+    super("Peer '" + peer + "' not found " + message, cause);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java
b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java
index e20d35f..cc51a11 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java
@@ -31,6 +31,7 @@ public interface ReplicaSystem {
    * @param p Path to the resource we're reading from
    * @param status Information to replicate
    * @param target The peer
+   * @param helper Instance of ReplicaSystemHelper
    * @return A new Status for the progress that was made
    */
   public Status replicate(Path p, Status status, ReplicationTarget target, ReplicaSystemHelper
helper);
@@ -39,7 +40,7 @@ public interface ReplicaSystem {
    * Configure the implementation with necessary information from the system configuration
    * <p>
    * For example, we only need one implementation for Accumulo, but, for each peer,
-   * we have a ZK quorom and instance name
+   * we have a ZK quorum and instance name
    * @param configuration
    */
   public void configure(String configuration);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java
b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java
index d1df97e..164512a 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java
@@ -27,6 +27,8 @@ import com.google.common.base.Preconditions;
 public class ReplicaSystemFactory {
   private static final Logger log = LoggerFactory.getLogger(ReplicaSystemFactory.class);
 
+  private ReplicaSystemFactory() {}
+
   /**
    * @param value
    *          {@link ReplicaSystem} implementation class name
@@ -53,10 +55,10 @@ public class ReplicaSystemFactory {
         return rs;
       }
 
-      throw new RuntimeException("Class is not assignable to ReplicaSystem: " + name);
+      throw new IllegalArgumentException("Class is not assignable to ReplicaSystem: " + name);
     } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e)
{
       log.error("Error creating ReplicaSystem object", e);
-      throw new RuntimeException(e);
+      throw new IllegalArgumentException(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/data/Mutation.java b/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
index 619e522..a134ec8 100644
--- a/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
+++ b/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
@@ -784,6 +784,9 @@ public class Mutation implements Writable {
    * @return An unmodifiable view of the replication sources
    */
   public Set<String> getReplicationSources() {
+    if (null == replicationSources) {
+      return EMPTY;
+    }
     return Collections.unmodifiableSet(replicationSources);
   }
   
@@ -926,9 +929,13 @@ public class Mutation implements Writable {
       }
     }
     if (0x02 == (0x02 & hasValues)) {
-      WritableUtils.writeVInt(out, replicationSources.size());
-      for (String source : replicationSources) {
-        WritableUtils.writeString(out, source);
+      if (null == replicationSources) {
+        WritableUtils.writeVInt(out, 0);
+      } else {
+        WritableUtils.writeVInt(out, replicationSources.size());
+        for (String source : replicationSources) {
+          WritableUtils.writeString(out, source);
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
index b340009..e98bc1d 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
@@ -27,13 +27,14 @@ import org.apache.accumulo.master.Master;
 import org.apache.accumulo.trace.instrument.CountSampler;
 import org.apache.accumulo.trace.instrument.Sampler;
 import org.apache.accumulo.trace.instrument.Trace;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Daemon wrapper around the {@link WorkMaker} that separates it from the Master
  */
 public class ReplicationDriver extends Daemon {
-  private static final Logger log = Logger.getLogger(ReplicationDriver.class);
+  private static final Logger log = LoggerFactory.getLogger(ReplicationDriver.class);
 
   private final Master master;
   private final AccumuloConfiguration conf;
@@ -95,7 +96,13 @@ public class ReplicationDriver extends Daemon {
       Trace.offNoFlush();
 
       // Sleep for a bit
-      UtilWaitThread.sleep(conf.getTimeInMillis(Property.MASTER_REPLICATION_SCAN_INTERVAL));
+      long sleepMillis = conf.getTimeInMillis(Property.MASTER_REPLICATION_SCAN_INTERVAL);
+      log.debug("Sleeping for {}ms before re-running", sleepMillis);
+      try {
+        Thread.sleep(sleepMillis);
+      } catch (InterruptedException e) {
+        log.error("Interrupted while sleeping", e);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
b/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
index a03cfab..a75113b 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
@@ -121,6 +121,7 @@ public class CyclicReplicationIT {
 
       String master1UserName = "master1", master1Password = "foo";
       String master2UserName = "master2", master2Password = "bar";
+      String master1Table = master1Cluster.getInstanceName(), master2Table = master2Cluster.getInstanceName();
 
       connMaster1.securityOperations().createLocalUser(master1UserName, new PasswordToken(master1Password));
       connMaster2.securityOperations().createLocalUser(master2UserName, new PasswordToken(master2Password));
@@ -142,27 +143,27 @@ public class CyclicReplicationIT {
           ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
               AccumuloReplicaSystem.buildConfiguration(master1Cluster.getInstanceName(),
master1Cluster.getZooKeepers())));
 
-      connMaster1.tableOperations().create(master1Cluster.getInstanceName(), false);
-      String master1TableId = connMaster1.tableOperations().tableIdMap().get(master1Cluster.getInstanceName());
+      connMaster1.tableOperations().create(master1Table, false);
+      String master1TableId = connMaster1.tableOperations().tableIdMap().get(master1Table);
       Assert.assertNotNull(master1TableId);
 
-      connMaster2.tableOperations().create(master2Cluster.getInstanceName(), false);
-      String master2TableId = connMaster2.tableOperations().tableIdMap().get(master2Cluster.getInstanceName());
+      connMaster2.tableOperations().create(master2Table, false);
+      String master2TableId = connMaster2.tableOperations().tableIdMap().get(master2Table);
       Assert.assertNotNull(master2TableId);
 
       // Replicate master1 in the master1 cluster to master2 in the master2 cluster
-      connMaster1.tableOperations().setProperty(master1Cluster.getInstanceName(), Property.TABLE_REPLICATION.getKey(),
"true");
-      connMaster1.tableOperations().setProperty(master1Cluster.getInstanceName(),
+      connMaster1.tableOperations().setProperty(master1Table, Property.TABLE_REPLICATION.getKey(),
"true");
+      connMaster1.tableOperations().setProperty(master1Table,
           Property.TABLE_REPLICATION_TARGETS.getKey() + master2Cluster.getInstanceName(),
master2TableId);
 
       // Replicate master2 in the master2 cluster to master1 in the master2 cluster
-      connMaster2.tableOperations().setProperty(master2Cluster.getInstanceName(), Property.TABLE_REPLICATION.getKey(),
"true");
-      connMaster2.tableOperations().setProperty(master2Cluster.getInstanceName(),
+      connMaster2.tableOperations().setProperty(master2Table, Property.TABLE_REPLICATION.getKey(),
"true");
+      connMaster2.tableOperations().setProperty(master2Table,
           Property.TABLE_REPLICATION_TARGETS.getKey() + master1Cluster.getInstanceName(),
master1TableId);
 
       // Give our replication user the ability to write to the respective table
-      connMaster1.securityOperations().grantTablePermission(master1UserName, master1Cluster.getInstanceName(),
TablePermission.WRITE);
-      connMaster2.securityOperations().grantTablePermission(master2UserName, master2Cluster.getInstanceName(),
TablePermission.WRITE);
+      connMaster1.securityOperations().grantTablePermission(master1UserName, master1Table,
TablePermission.WRITE);
+      connMaster2.securityOperations().grantTablePermission(master2UserName, master2Table,
TablePermission.WRITE);
 
       IteratorSetting summingCombiner = new IteratorSetting(50, SummingCombiner.class);
       SummingCombiner.setEncodingType(summingCombiner, Type.STRING);
@@ -170,17 +171,17 @@ public class CyclicReplicationIT {
 
       // Set a combiner on both instances that will sum multiple values
       // We can use this to verify that the mutation was not sent multiple times
-      connMaster1.tableOperations().attachIterator(master1Cluster.getInstanceName(), summingCombiner);
-      connMaster2.tableOperations().attachIterator(master2Cluster.getInstanceName(), summingCombiner);
+      connMaster1.tableOperations().attachIterator(master1Table, summingCombiner);
+      connMaster2.tableOperations().attachIterator(master2Table, summingCombiner);
 
       // Write a single entry
-      BatchWriter bw = connMaster1.createBatchWriter(master1Cluster.getInstanceName(), new
BatchWriterConfig());
+      BatchWriter bw = connMaster1.createBatchWriter(master1Table, new BatchWriterConfig());
       Mutation m = new Mutation("row");
       m.put("count", "", "1");
       bw.addMutation(m);
       bw.close();
 
-      Set<String> files = connMaster1.replicationOperations().referencedFiles(master1Cluster.getInstanceName());
+      Set<String> files = connMaster1.replicationOperations().referencedFiles(master1Table);
 
       log.info("Found {} that need replication from master1", files);
 
@@ -194,22 +195,22 @@ public class CyclicReplicationIT {
       log.info("Restarted tserver on master1");
 
       // Sanity check that the element is there on master1
-      Scanner s = connMaster1.createScanner(master1Cluster.getInstanceName(), Authorizations.EMPTY);
+      Scanner s = connMaster1.createScanner(master1Table, Authorizations.EMPTY);
       Entry<Key,Value> entry = Iterables.getOnlyElement(s);
       Assert.assertEquals("1", entry.getValue().toString());
 
       // Wait for this table to replicate
-      connMaster1.replicationOperations().drain(master1Cluster.getInstanceName(), files);
+      connMaster1.replicationOperations().drain(master1Table, files);
 
       Thread.sleep(5000);
 
       // Check that the element made it to master2 only once
-      s = connMaster2.createScanner(master2Cluster.getInstanceName(), Authorizations.EMPTY);
+      s = connMaster2.createScanner(master2Table, Authorizations.EMPTY);
       entry = Iterables.getOnlyElement(s);
       Assert.assertEquals("1", entry.getValue().toString());
 
       // Wait for master2 to finish replicating it back
-      files = connMaster2.replicationOperations().referencedFiles(master2Cluster.getInstanceName());
+      files = connMaster2.replicationOperations().referencedFiles(master2Table);
 
       // Kill and restart the tserver to close the WAL on master2
       for (ProcessReference proc : master2Cluster.getProcesses().get(ServerType.TABLET_SERVER))
{
@@ -219,16 +220,16 @@ public class CyclicReplicationIT {
       master2Cluster.exec(TabletServer.class);
 
       // Check that the element made it to master2 only once
-      s = connMaster2.createScanner(master2Cluster.getInstanceName(), Authorizations.EMPTY);
+      s = connMaster2.createScanner(master2Table, Authorizations.EMPTY);
       entry = Iterables.getOnlyElement(s);
       Assert.assertEquals("1", entry.getValue().toString());
 
-      connMaster2.replicationOperations().drain(master2Cluster.getInstanceName(), files);
+      connMaster2.replicationOperations().drain(master2Table, files);
 
       Thread.sleep(5000);
 
       // Verify that the entry wasn't sent back to master1
-      s = connMaster1.createScanner(master1Cluster.getInstanceName(), Authorizations.EMPTY);
+      s = connMaster1.createScanner(master1Table, Authorizations.EMPTY);
       entry = Iterables.getOnlyElement(s);
       Assert.assertEquals("1", entry.getValue().toString());
     } finally {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java
b/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java
index 6c21962..d561d2f 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java
@@ -113,40 +113,40 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT
{
     try {
       final Connector connMaster = getConnector();
       final Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD);
-  
+
       ReplicationTable.create(connMaster);
 
       String peerUserName = "peer", peerPassword = "foo";
-  
+
       String peerClusterName = "peer";
 
       connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
-      
+
       connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey()
+ peerClusterName, peerUserName);
       connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey()
+ peerClusterName, peerPassword);
-  
+
       // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
       connMaster.instanceOperations().setProperty(
           Property.REPLICATION_PEERS.getKey() + peerClusterName,
           ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
               AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers())));
-  
+
       final String masterTable = "master", peerTable = "peer";
-  
+
       connMaster.tableOperations().create(masterTable);
       String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable);
       Assert.assertNotNull(masterTableId);
-  
+
       connPeer.tableOperations().create(peerTable);
       String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable);
       Assert.assertNotNull(peerTableId);
 
       connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE);
-  
+
       // Replicate this table to the peerClusterName in a table with the peerTableId table
id
       connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(),
"true");
       connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGETS.getKey()
+ peerClusterName, peerTableId);
-  
+
       // Write some data to table1
       BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig());
       for (int rows = 0; rows < 5000; rows++) {
@@ -157,23 +157,23 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT
{
         }
         bw.addMutation(m);
       }
-  
+
       bw.close();
-  
+
       log.info("Wrote all data to master cluster");
-  
+
       final Set<String> filesNeedingReplication = connMaster.replicationOperations().referencedFiles(masterTable);
-  
+
       for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER))
{
         cluster.killProcess(ServerType.TABLET_SERVER, proc);
       }
       cluster.exec(TabletServer.class);
-  
+
       log.info("TabletServer restarted");
       for (@SuppressWarnings("unused")
       Entry<Key,Value> e : ReplicationTable.getScanner(connMaster)) {}
       log.info("TabletServer is online");
-  
+
       log.info("");
       log.info("Fetching metadata records:");
       for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY))
{
@@ -183,33 +183,33 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT
{
           log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
         }
       }
-  
+
       log.info("");
       log.info("Fetching replication records:");
       for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY))
{
         log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
       }
-  
+
       Future<Boolean> future = executor.submit(new Callable<Boolean>() {
-  
+
         @Override
         public Boolean call() throws Exception {
           connMaster.replicationOperations().drain(masterTable, filesNeedingReplication);
           log.info("Drain completed");
           return true;
         }
-  
+
       });
-  
+
       try {
         future.get(30, TimeUnit.SECONDS);
       } catch (TimeoutException e) {
         future.cancel(true);
         Assert.fail("Drain did not finish within 30 seconds");
       }
-  
+
       log.info("drain completed");
-  
+
       log.info("");
       log.info("Fetching metadata records:");
       for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY))
{
@@ -219,13 +219,13 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT
{
           log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
         }
       }
-  
+
       log.info("");
       log.info("Fetching replication records:");
       for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY))
{
         log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
       }
-  
+
       Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer
= connPeer.createScanner(peerTable, Authorizations.EMPTY);
       Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator();
       Entry<Key,Value> masterEntry = null, peerEntry = null;
@@ -236,10 +236,10 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT
{
             masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS));
         Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue());
       }
-  
+
       log.info("Last master entry: " + masterEntry);
       log.info("Last peer entry: " + peerEntry);
-  
+
       Assert.assertFalse("Had more data to read from the master", masterIter.hasNext());
       Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext());
     } finally {
@@ -377,7 +377,7 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT
{
           Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate()
+ " " + entry.getValue(), entry.getKey().getRow().toString()
               .startsWith(masterTable1));
         }
-  
+
         log.info("Found {} records in {}", countTable, peerTable1);
 
         if (masterTable1Records != countTable) {
@@ -394,7 +394,7 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT
{
           Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate()
+ " " + entry.getValue(), entry.getKey().getRow().toString()
               .startsWith(masterTable2));
         }
-  
+
         log.info("Found {} records in {}", countTable, peerTable2);
 
         if (masterTable2Records != countTable) {
@@ -605,7 +605,6 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT
{
         Thread.sleep(500);
       }
 
-
       for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER))
{
         cluster.killProcess(ServerType.TABLET_SERVER, proc);
       }


Mime
View raw message