accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [1/2] git commit: ACCUMULO-2581 Revisit the WorkAssigner impls to make some proper OO hierarchy.
Date Wed, 28 May 2014 05:08:06 GMT
Repository: accumulo
Updated Branches:
  refs/heads/ACCUMULO-378 fa18d9dcf -> 070ceb1da


ACCUMULO-2581 Revisit the WorkAssigner impls to make some proper OO hierarchy.

The SequentialWorkAssigner is nice, and does what it's meant to, but it's
not nearly as fast as can be done with multiple tservers. It's probably a
good idea to keep a sequential and unordered work assigner in good working order


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/34da6fe9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/34da6fe9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/34da6fe9

Branch: refs/heads/ACCUMULO-378
Commit: 34da6fe9bddbebbc19feb2d12f53edcb743a5733
Parents: fa18d9d
Author: Josh Elser <elserj@apache.org>
Authored: Tue May 27 22:55:50 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Tue May 27 22:55:50 2014 -0400

----------------------------------------------------------------------
 .../replication/AbstractWorkAssigner.java       |  82 -----
 .../DistributedWorkQueueWorkAssigner.java       | 320 +++++++++---------
 .../replication/SequentialWorkAssigner.java     | 330 ++++++-------------
 .../replication/UnorderedWorkAssigner.java      | 170 ++++++++++
 .../replication/AbstractWorkAssignerTest.java   |   7 +-
 .../DistributedWorkQueueWorkAssignerTest.java   | 302 -----------------
 .../replication/SequentialWorkAssignerTest.java |  39 ++-
 .../replication/UnorderedWorkAssignerTest.java  | 293 ++++++++++++++++
 .../monitor/servlets/ReplicationServlet.java    |   4 +-
 .../replication/ReplicationProcessor.java       |   4 +-
 .../replication/ReplicationProcessorTest.java   |   4 +-
 11 files changed, 756 insertions(+), 799 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/34da6fe9/server/base/src/main/java/org/apache/accumulo/server/replication/AbstractWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/AbstractWorkAssigner.java b/server/base/src/main/java/org/apache/accumulo/server/replication/AbstractWorkAssigner.java
deleted file mode 100644
index 625a36e..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/AbstractWorkAssigner.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.replication;
-
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-/**
- * Common methods for {@link WorkAssigner}s
- */
-public abstract class AbstractWorkAssigner implements WorkAssigner {
-
-  protected boolean isWorkRequired(Status status) {
-    return StatusUtil.isWorkRequired(status);
-  }
-
-  public static final String KEY_SEPARATOR = "|";
-
-  /**
-   * Serialize a filename and a {@link ReplicationTarget} into the expected key format for use with the {@link DistributedWorkQueue}
-   * 
-   * @param filename
-   *          Filename for data to be replicated
-   * @param replTarget
-   *          Information about replication peer
-   * @return Key for identifying work in queue
-   */
-  public static String getQueueKey(String filename, ReplicationTarget replTarget) {
-    return filename + KEY_SEPARATOR + replTarget.getPeerName() + KEY_SEPARATOR + replTarget.getRemoteIdentifier() + KEY_SEPARATOR
-        + replTarget.getSourceTableId();
-  }
-
-  /**
-   * @param queueKey
-   *          Key from the work queue
-   * @return Components which created the queue key
-   */
-  public static Entry<String,ReplicationTarget> fromQueueKey(String queueKey) {
-    Preconditions.checkNotNull(queueKey);
-
-    int index = queueKey.indexOf(KEY_SEPARATOR);
-    if (-1 == index) {
-      throw new IllegalArgumentException("Could not find expected separator in queue key '" + queueKey + "'");
-    }
-
-    String filename = queueKey.substring(0, index);
-
-    int secondIndex = queueKey.indexOf(KEY_SEPARATOR, index + 1);
-    if (-1 == secondIndex) {
-      throw new IllegalArgumentException("Could not find expected separator in queue key '" + queueKey + "'");
-    }
-
-    int thirdIndex = queueKey.indexOf(KEY_SEPARATOR, secondIndex + 1);
-    if (-1 == thirdIndex) {
-      throw new IllegalArgumentException("Could not find expected seperator in queue key '" + queueKey + "'");
-    }
-
-    return Maps.immutableEntry(filename, new ReplicationTarget(queueKey.substring(index + 1, secondIndex), queueKey.substring(secondIndex + 1, thirdIndex),
-        queueKey.substring(thirdIndex + 1)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/34da6fe9/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
index 4f883af..0fd5205 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
@@ -16,106 +16,99 @@
  */
 package org.apache.accumulo.master.replication;
 
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
+import java.util.Collection;
 import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.server.replication.AbstractWorkAssigner;
 import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.server.replication.WorkAssigner;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.TextFormat;
 
 /**
- * Read work records from the replication table, create work entries for other nodes to complete.
- * <p>
- * Uses the DistributedWorkQueue to make the work available for any tserver. This approach does not consider the locality of the tabletserver performing the
- * work in relation to the data being replicated (local HDFS blocks).
- * <p>
- * The implementation allows for multiple tservers to concurrently replicate data to peer(s), however it is possible that data for a table is replayed on the
- * peer in a different order than the master. The {@link SequentialWorkAssigner} should be used if this must be guaranteed at the cost of replication
- * throughput.
+ * Common methods for {@link WorkAssigner}s
  */
-public class DistributedWorkQueueWorkAssigner extends AbstractWorkAssigner {
+public abstract class DistributedWorkQueueWorkAssigner implements WorkAssigner {
   private static final Logger log = LoggerFactory.getLogger(DistributedWorkQueueWorkAssigner.class);
-  private static final String NAME = "DistributedWorkQueue Replication Work Assigner";
 
-  private Connector conn;
-  private AccumuloConfiguration conf;
+  protected boolean isWorkRequired(Status status) {
+    return StatusUtil.isWorkRequired(status);
+  }
 
-  private DistributedWorkQueue workQueue;
-  private Set<String> queuedWork;
-  private int maxQueueSize;
-  private ZooCache zooCache;
+  protected Connector conn;
+  protected AccumuloConfiguration conf;
+  protected DistributedWorkQueue workQueue;
+  protected int maxQueueSize;
+  protected ZooCache zooCache;
 
-  public DistributedWorkQueueWorkAssigner() {}
+  public static final String KEY_SEPARATOR = "|";
 
-  public DistributedWorkQueueWorkAssigner(AccumuloConfiguration conf, Connector conn) {
-    this.conf = conf;
-    this.conn = conn;
-  }
-
-  @Override
-  public void configure(AccumuloConfiguration conf, Connector conn) {
-    this.conf = conf;
-    this.conn = conn;
+  /**
+   * Serialize a filename and a {@link ReplicationTarget} into the expected key format for use with the {@link DistributedWorkQueue}
+   * 
+   * @param filename
+   *          Filename for data to be replicated
+   * @param replTarget
+   *          Information about replication peer
+   * @return Key for identifying work in queue
+   */
+  public static String getQueueKey(String filename, ReplicationTarget replTarget) {
+    return filename + KEY_SEPARATOR + replTarget.getPeerName() + KEY_SEPARATOR + replTarget.getRemoteIdentifier() + KEY_SEPARATOR
+        + replTarget.getSourceTableId();
   }
 
-  @Override
-  public String getName() {
-    return NAME;
-  }
+  /**
+   * @param queueKey
+   *          Key from the work queue
+   * @return Components which created the queue key
+   */
+  public static Entry<String,ReplicationTarget> fromQueueKey(String queueKey) {
+    Preconditions.checkNotNull(queueKey);
 
-  @Override
-  public void assignWork() {
-    if (null == workQueue) {
-      initializeWorkQueue(conf);
+    int index = queueKey.indexOf(KEY_SEPARATOR);
+    if (-1 == index) {
+      throw new IllegalArgumentException("Could not find expected separator in queue key '" + queueKey + "'");
     }
 
-    if (null == queuedWork) {
-      log.info("Reinitializing state from DistributedWorkQueue in ZooKeeper");
-      initializeQueuedWork();
-    }
+    String filename = queueKey.substring(0, index);
 
-    if (null == zooCache) {
-      zooCache = new ZooCache();
+    int secondIndex = queueKey.indexOf(KEY_SEPARATOR, index + 1);
+    if (-1 == secondIndex) {
+      throw new IllegalArgumentException("Could not find expected separator in queue key '" + queueKey + "'");
     }
 
-    // Get the maximum number of entries we want to queue work for (or the default)
-    this.maxQueueSize = conf.getCount(Property.REPLICATION_MAX_WORK_QUEUE);
-
-    log.info("Creating work entries from replication table");
-    // Scan over the work records, adding the work to the queue
-    createWork();
+    int thirdIndex = queueKey.indexOf(KEY_SEPARATOR, secondIndex + 1);
+    if (-1 == thirdIndex) {
+      throw new IllegalArgumentException("Could not find expected seperator in queue key '" + queueKey + "'");
+    }
 
-    log.info("Cleaning up finished work entries from replication table");
-    // Keep the state of the work we queued correct
-    cleanupFinishedWork();
+    return Maps.immutableEntry(filename, new ReplicationTarget(queueKey.substring(index + 1, secondIndex), queueKey.substring(secondIndex + 1, thirdIndex),
+        queueKey.substring(thirdIndex + 1)));
   }
 
   /*
@@ -145,14 +138,6 @@ public class DistributedWorkQueueWorkAssigner extends AbstractWorkAssigner {
     this.workQueue = workQueue;
   }
 
-  protected Set<String> getQueuedWork() {
-    return queuedWork;
-  }
-
-  protected void setQueuedWork(Set<String> queuedWork) {
-    this.queuedWork = queuedWork;
-  }
-
   protected int getMaxQueueSize() {
     return maxQueueSize;
   }
@@ -169,7 +154,6 @@ public class DistributedWorkQueueWorkAssigner extends AbstractWorkAssigner {
     this.zooCache = zooCache;
   }
 
-
   /**
    * Initialize the DistributedWorkQueue using the proper ZK location
    * 
@@ -179,124 +163,162 @@ public class DistributedWorkQueueWorkAssigner extends AbstractWorkAssigner {
     workQueue = new DistributedWorkQueue(ZooUtil.getRoot(conn.getInstance()) + Constants.ZREPLICATION_WORK_QUEUE, conf);
   }
 
-  /**
-   * Initialize the queuedWork set with the work already sent out
-   */
-  protected void initializeQueuedWork() {
-    Preconditions.checkArgument(null == queuedWork, "Expected queuedWork to be null");
-    queuedWork = new HashSet<>();
-    while (true) {
-      try {
-        queuedWork.addAll(workQueue.getWorkQueued());
-        return;
-      } catch (KeeperException e) {
-        if (KeeperException.Code.NONODE.equals(e.code())) {
-          log.warn("Could not find ZK root for replication work queue, will retry", e);
-          UtilWaitThread.sleep(500);
-          continue;
-        }
+  @Override
+  public void configure(AccumuloConfiguration conf, Connector conn) {
+    this.conf = conf;
+    this.conn = conn;
+  }
 
-        log.error("Error reading existing queued replication work from ZooKeeper", e);
-        throw new RuntimeException("Error reading existing queued replication work from ZooKeeper", e);
-      } catch (InterruptedException e) {
-        log.error("Error reading existing queued replication work from ZooKeeper", e);
-        throw new RuntimeException("Error reading existing queued replication work from ZooKeeper", e);
-      }
+  @Override
+  public void assignWork() {
+    if (null == workQueue) {
+      initializeWorkQueue(conf);
+    }
+
+    initializeQueuedWork();
+
+    if (null == zooCache) {
+      zooCache = new ZooCache();
     }
+
+    // Get the maximum number of entries we want to queue work for (or the default)
+    this.maxQueueSize = conf.getCount(Property.REPLICATION_MAX_WORK_QUEUE);
+
+    // Scan over the work records, adding the work to the queue
+    createWork();
+
+    // Keep the state of the work we queued correct
+    cleanupFinishedWork();
   }
 
   /**
    * Scan over the {@link WorkSection} of the replication table adding work for entries that have data to replicate and have not already been queued.
    */
   protected void createWork() {
-    // Create a batchscanner over the replication table's work entries
-    BatchScanner bs;
+    // Create a scanner over the replication table's order entries
+    Scanner s;
     try {
-      bs = ReplicationTable.getBatchScanner(conn, 4);
+      s = ReplicationTable.getScanner(conn);
     } catch (TableNotFoundException e) {
       return;
     }
 
-    WorkSection.limit(bs);
-    bs.setRanges(Collections.singleton(new Range()));
+    OrderSection.limit(s);
+
     Text buffer = new Text();
-    long filesWorkWasCreatedFrom = 0l;
-    try {
-      for (Entry<Key,Value> entry : bs) {
-        // If we're not working off the entries, we need to not shoot ourselves in the foot by continuing
-        // to add more work entries
-        if (queuedWork.size() > maxQueueSize) {
-          log.warn("Queued replication work exceeds configured maximum ({}), sleeping to allow work to occur", maxQueueSize);
-          return;
-        }
+    for (Entry<Key,Value> orderEntry : s) {
+      // If we're not working off the entries, we need to not shoot ourselves in the foot by continuing
+      // to add more work entries
+      if (getQueueSize() > maxQueueSize) {
+        log.warn("Queued replication work exceeds configured maximum ({}), sleeping to allow work to occur", maxQueueSize);
+        return;
+      }
+
+      String file = OrderSection.getFile(orderEntry.getKey(), buffer);
+      OrderSection.getTableId(orderEntry.getKey(), buffer);
+      String sourceTableId = buffer.toString();
+
+      log.info("Determining if {} from {} needs to be replicated", file, sourceTableId);
+
+      Scanner workScanner;
+      try {
+        workScanner = ReplicationTable.getScanner(conn);
+      } catch (TableNotFoundException e) {
+        log.warn("Replication table was deleted. Will retry...");
+        UtilWaitThread.sleep(5000);
+        return;
+      }
+
+      WorkSection.limit(workScanner);
+      workScanner.setRange(Range.exact(file));
 
-        WorkSection.getFile(entry.getKey(), buffer);
-        String file = buffer.toString();
+      int newReplicationTasksSubmitted = 0;
+      // For a file, we can concurrently replicate it to multiple targets
+      for (Entry<Key,Value> workEntry : workScanner) {
         Status status;
         try {
-          status = StatusUtil.fromValue(entry.getValue());
+          status = StatusUtil.fromValue(workEntry.getValue());
         } catch (InvalidProtocolBufferException e) {
-          log.warn("Could not deserialize protobuf from work entry for {}", file, e);
+          log.warn("Could not deserialize protobuf from work entry for {} to {}, will retry", file,
+              ReplicationTarget.from(workEntry.getKey().getColumnQualifier()), e);
+          continue;
+        }
+
+        // Get the ReplicationTarget for this Work record
+        ReplicationTarget target = WorkSection.getTarget(workEntry.getKey(), buffer);
+
+        // Get the file (if any) currently being replicated to the given peer for the given source table
+        Collection<String> keysBeingReplicated = getQueuedWork(target);
+
+        Path p = new Path(file);
+        String filename = p.getName();
+        String key = getQueueKey(filename, target);
+
+        if (!shouldQueueWork(target)) {
+          if (!isWorkRequired(status) && keysBeingReplicated.contains(key)) {
+            log.debug("Removing {} from replication state to {} because replication is complete", key, target.getPeerName());
+            this.removeQueuedWork(target, key);
+          }
+
           continue;
         }
 
         // If there is work to do
         if (isWorkRequired(status)) {
-          Path p = new Path(file);
-          String filename = p.getName();
-          WorkSection.getTarget(entry.getKey(), buffer);
-          String key = getQueueKey(filename, ReplicationTarget.from(buffer));
-
-          // And, we haven't already queued this file up for work already
-          if (!queuedWork.contains(key)) {
-            queueWork(key, file);
-            filesWorkWasCreatedFrom++;
-          } else {
-            log.trace("Not re-queueing work for {}", key);
+          if (queueWork(p, target)) {
+            newReplicationTasksSubmitted++;
           }
         } else {
-          log.debug("Not queueing work for {} because {} doesn't need replication", file, TextFormat.shortDebugString(status));
+          log.debug("Not queueing work for {} to {} because {} doesn't need replication", file, target, ProtobufUtil.toString(status));
+          if (keysBeingReplicated.contains(key)) {
+            log.debug("Removing {} from replication state to {} because replication is complete", key, target.getPeerName());
+            this.removeQueuedWork(target, key);
+          }
         }
       }
-    } finally {
-      log.info("Created work entries for {} files", filesWorkWasCreatedFrom);
 
-      if (null != bs) {
-        bs.close();
-      }
+      log.info("Assigned {} replication work entries for {}", newReplicationTasksSubmitted, file);
     }
   }
 
   /**
-   * Distribute the work for the given path with filename
-   * 
-   * @param key
-   *          Unique key to identify this work in the queue
-   * @param path
-   *          Full path to a file
+   * @return Can replication work for the given {@link ReplicationTarget} be submitted to be worked on.
    */
-  protected void queueWork(String key, String path) {
-    try {
-      log.debug("Queued work for {} and {}", key, path);
-      workQueue.addWork(key, path);
-      queuedWork.add(key);
-    } catch (KeeperException | InterruptedException e) {
-      log.warn("Could not queue work for {}", path, e);
-    }
-  }
+  protected abstract boolean shouldQueueWork(ReplicationTarget target);
 
   /**
-   * Iterate over the queued work to remove entries that have been completed.
+   * @return the size of the queued work
    */
-  protected void cleanupFinishedWork() {
-    final Iterator<String> work = queuedWork.iterator();
-    final String instanceId = conn.getInstance().getInstanceID();
-    while (work.hasNext()) {
-      String filename = work.next();
-      // Null equates to the work was finished
-      if (null == zooCache.get(ZooUtil.getRoot(instanceId) + Constants.ZREPLICATION_WORK_QUEUE + "/" + filename)) {
-        work.remove();
-      }
-    }
-  }
+  protected abstract int getQueueSize();
+
+  /**
+   * Set up any internal state before using the WorkAssigner
+   */
+  protected abstract void initializeQueuedWork();
+
+  /**
+   * Queue the given work for the target
+   * @param path File to replicate
+   * @param target Target for the work
+   * @return True if the work was queued, false otherwise
+   */
+  protected abstract boolean queueWork(Path path, ReplicationTarget target);
+
+  /**
+   * @param target Target for the work
+   * @return Queued work for the given target
+   */
+  protected abstract Set<String> getQueuedWork(ReplicationTarget target);
+
+  /**
+   * Remove the given work from the internal state
+   * @param target 
+   * @param queueKey
+   */
+  protected abstract void removeQueuedWork(ReplicationTarget target, String queueKey);
+
+  /**
+   * Remove finished replication work from the internal state
+   */
+  protected abstract void cleanupFinishedWork();
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/34da6fe9/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
index d168867..e9ed34e 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
@@ -16,52 +16,35 @@
  */
 package org.apache.accumulo.master.replication;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.server.replication.AbstractWorkAssigner;
-import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
-import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-import com.google.protobuf.InvalidProtocolBufferException;
-
 /**
- * Creates work in ZK which is <code>filename.serialized_ReplicationTarget => filename</code>
+ * Creates work in ZK which is <code>filename.serialized_ReplicationTarget => filename</code>, but replicates
+ * files in the order in which they were created.
+ * <p>
+ * The intent is to ensure that WALs are replayed in the same order on the peer in which
+ * they were applied on the primary.
  */
-public class SequentialWorkAssigner extends AbstractWorkAssigner {
+public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
   private static final Logger log = LoggerFactory.getLogger(SequentialWorkAssigner.class);
   private static final String NAME = "Sequential Work Assigner";
 
-  private Connector conn;
-  private AccumuloConfiguration conf;
-
   // @formatter:off
   /*
    * { 
@@ -73,21 +56,10 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
   // @formatter:on
   private Map<String,Map<String,String>> queuedWorkByPeerName;
 
-  private DistributedWorkQueue workQueue;
-  private int maxQueueSize;
-  private ZooCache zooCache;
-
   public SequentialWorkAssigner() {}
 
   public SequentialWorkAssigner(AccumuloConfiguration conf, Connector conn) {
-    this.conf = conf;
-    this.conn = conn;
-  }
-
-  @Override
-  public void configure(AccumuloConfiguration conf, Connector conn) {
-    this.conf = conf;
-    this.conn = conn;
+    configure(conf, conn);
   }
 
   @Override
@@ -95,64 +67,6 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
     return NAME;
   }
 
-  @Override
-  public void assignWork() {
-    if (null == workQueue) {
-      initializeWorkQueue(conf);
-    }
-
-    if (null == queuedWorkByPeerName) {
-      initializeQueuedWork();
-    }
-
-    if (null == zooCache) {
-      zooCache = new ZooCache();
-    }
-
-    // Get the maximum number of entries we want to queue work for (or the default)
-    this.maxQueueSize = conf.getCount(Property.REPLICATION_MAX_WORK_QUEUE);
-
-    for (Entry<String,Map<String,String>> peer : this.queuedWorkByPeerName.entrySet()) {
-      log.info("In progress replications for {}", peer.getKey());
-      for (Entry<String,String> tableRepl : peer.getValue().entrySet()) {
-        log.info("Replicating {} for table ID {}", tableRepl.getValue(), tableRepl.getKey());
-      }
-    }
-
-    // Scan over the work records, adding the work to the queue
-    createWork();
-
-    // Keep the state of the work we queued correct
-    cleanupFinishedWork();
-  }
-
-  /*
-   * Getters/setters for testing purposes
-   */
-  protected Connector getConnector() {
-    return conn;
-  }
-
-  protected void setConnector(Connector conn) {
-    this.conn = conn;
-  }
-
-  protected AccumuloConfiguration getConf() {
-    return conf;
-  }
-
-  protected void setConf(AccumuloConfiguration conf) {
-    this.conf = conf;
-  }
-
-  protected DistributedWorkQueue getWorkQueue() {
-    return workQueue;
-  }
-
-  protected void setWorkQueue(DistributedWorkQueue workQueue) {
-    this.workQueue = workQueue;
-  }
-
   protected Map<String,Map<String,String>> getQueuedWork() {
     return queuedWorkByPeerName;
   }
@@ -161,36 +75,15 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
     this.queuedWorkByPeerName = queuedWork;
   }
 
-  protected int getMaxQueueSize() {
-    return maxQueueSize;
-  }
-
-  protected void setMaxQueueSize(int maxQueueSize) {
-    this.maxQueueSize = maxQueueSize;
-  }
-
-  protected ZooCache getZooCache() {
-    return zooCache;
-  }
-
-  protected void setZooCache(ZooCache zooCache) {
-    this.zooCache = zooCache;
-  }
-
-  /**
-   * Initialize the DistributedWorkQueue using the proper ZK location
-   * 
-   * @param conf
-   */
-  protected void initializeWorkQueue(AccumuloConfiguration conf) {
-    workQueue = new DistributedWorkQueue(ZooUtil.getRoot(conn.getInstance()) + Constants.ZREPLICATION_WORK_QUEUE, conf);
-  }
-
   /**
    * Initialize the queuedWork set with the work already sent out
    */
+  @Override
   protected void initializeQueuedWork() {
-    Preconditions.checkArgument(null == queuedWorkByPeerName, "Expected queuedWork to be null");
+    if (null != queuedWorkByPeerName) {
+      return;
+    }
+
     queuedWorkByPeerName = new HashMap<>();
     List<String> existingWork;
     try {
@@ -220,123 +113,9 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
   }
 
   /**
-   * Scan over the {@link WorkSection} of the replication table adding work for entries that have data to replicate and have not already been queued.
-   */
-  protected void createWork() {
-    // Create a scanner over the replication table's order entries
-    Scanner s;
-    try {
-      s = ReplicationTable.getScanner(conn);
-    } catch (TableNotFoundException e) {
-      return;
-    }
-
-    OrderSection.limit(s);
-
-    Text buffer = new Text();
-    for (Entry<Key,Value> orderEntry : s) {
-      // If we're not working off the entries, we need to not shoot ourselves in the foot by continuing
-      // to add more work entries
-      if (queuedWorkByPeerName.size() > maxQueueSize) {
-        log.warn("Queued replication work exceeds configured maximum ({}), sleeping to allow work to occur", maxQueueSize);
-        return;
-      }
-
-      String file = OrderSection.getFile(orderEntry.getKey(), buffer);
-      OrderSection.getTableId(orderEntry.getKey(), buffer);
-      String sourceTableId = buffer.toString();
-
-      log.info("Determining if {} from {} needs to be replicated", file, sourceTableId);
-
-      Scanner workScanner;
-      try {
-        workScanner = ReplicationTable.getScanner(conn);
-      } catch (TableNotFoundException e) {
-        log.warn("Replication table was deleted. Will retry...");
-        UtilWaitThread.sleep(5000);
-        return;
-      }
-
-      WorkSection.limit(workScanner);
-      workScanner.setRange(Range.exact(file));
-
-      int newReplicationTasksSubmitted = 0;
-      // For a file, we can concurrently replicate it to multiple targets
-      for (Entry<Key,Value> workEntry : workScanner) {
-        Status status;
-        try {
-          status = StatusUtil.fromValue(workEntry.getValue());
-        } catch (InvalidProtocolBufferException e) {
-          log.warn("Could not deserialize protobuf from work entry for {} to {}, will retry", file,
-              ReplicationTarget.from(workEntry.getKey().getColumnQualifier()), e);
-          continue;
-        }
-
-        // Get the ReplicationTarget for this Work record
-        ReplicationTarget target = WorkSection.getTarget(workEntry.getKey(), buffer);
-
-        Map<String,String> queuedWorkForPeer = queuedWorkByPeerName.get(target.getPeerName());
-        if (null == queuedWorkForPeer) {
-          queuedWorkForPeer = new HashMap<>();
-          queuedWorkByPeerName.put(target.getPeerName(), queuedWorkForPeer);
-        }
-
-        Path p = new Path(file);
-        String filename = p.getName();
-        String key = getQueueKey(filename, target);
-
-        // Get the file (if any) currently being replicated to the given peer for the given source table
-        String keyBeingReplicated = queuedWorkForPeer.get(sourceTableId);
-
-        // If there is work to do
-        if (isWorkRequired(status)) {
-          if (null == keyBeingReplicated) {
-            // If there is no file, submit this one for replication
-            newReplicationTasksSubmitted += queueWork(key, file, sourceTableId, queuedWorkForPeer);
-          } else if (keyBeingReplicated.startsWith(p.getName())) {
-            log.debug("Not re-queueing work for {} as it has already been queued fore replication to {}", file, target);
-          } else {
-            log.debug("Not queueing {} for work as {} must be replicated to {} first", file, keyBeingReplicated, target.getPeerName());
-          }
-        } else {
-          log.debug("Not queueing work for {} to {} because {} doesn't need replication", file, target, ProtobufUtil.toString(status));
-          if (key.equals(keyBeingReplicated)) {
-            log.debug("Removing {} from replication state to {} because replication is complete", keyBeingReplicated, target.getPeerName());
-            queuedWorkForPeer.remove(sourceTableId);
-            log.debug("State after removing element: {}", this.queuedWorkByPeerName);
-          }
-        }
-      }
-
-      log.info("Assigned {} replication work entries for {}", newReplicationTasksSubmitted, file);
-    }
-  }
-
-  /**
-   * Distribute the work for the given path with filename
-   * 
-   * @param key
-   *          Unique key to identify this work in the queue
-   * @param path
-   *          Full path to a file
-   */
-  protected int queueWork(String key, String path, String sourceTableId, Map<String,String> queuedWorkForPeer) {
-    try {
-      log.debug("Queued work for {} and {} from table ID {}", key, path, sourceTableId);
-
-      workQueue.addWork(key, path);
-      queuedWorkForPeer.put(sourceTableId, key);
-
-      return 1;
-    } catch (KeeperException | InterruptedException e) {
-      log.warn("Could not queue work for {}", path, e);
-      return 0;
-    }
-  }
-
-  /**
    * Iterate over the queued work to remove entries that have been completed.
    */
+  @Override
   protected void cleanupFinishedWork() {
     final Iterator<Entry<String,Map<String,String>>> queuedWork = queuedWorkByPeerName.entrySet().iterator();
     final String instanceId = conn.getInstance().getInstanceID();
@@ -367,4 +146,83 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
 
     log.info("Removed {} elements from internal workqueue state because the work was complete", elementsRemoved);
   }
+
+  @Override
+  protected int getQueueSize() {
+    return queuedWorkByPeerName.size();
+  }
+
+  @Override
+  protected boolean shouldQueueWork(ReplicationTarget target) {
+    Map<String,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
+    if (null == queuedWorkForPeer) {
+      return true;
+    }
+
+    String queuedWork = queuedWorkForPeer.get(target.getSourceTableId());
+
+    // If we have no work for the local table to the given peer, submit some!
+    return null == queuedWork;
+  }
+
+  @Override
+  protected boolean queueWork(Path path, ReplicationTarget target) {
+    String queueKey = getQueueKey(path.getName(), target);
+    Map<String,String> workForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
+    if (null == workForPeer) {
+      workForPeer = new HashMap<>();
+      this.queuedWorkByPeerName.put(target.getPeerName(), workForPeer);
+    }
+
+    String queuedWork = workForPeer.get(target.getSourceTableId());
+    if (null == queuedWork) {
+      try {
+        workQueue.addWork(queueKey, path.toString());
+        workForPeer.put(target.getSourceTableId(), queueKey);
+      } catch (KeeperException | InterruptedException e) {
+        log.warn("Could not queue work for {} to {}", path, target, e);
+        return false;
+      }
+
+      return true;
+    } else if (queuedWork.startsWith(path.getName())) {
+      log.debug("Not re-queueing work for {} as it has already been queued for replication to {}", path, target);
+      return false;
+    } else {
+      log.debug("Not queueing {} for work as {} must be replicated to {} first", path, queuedWork, target.getPeerName());
+      return false;
+    }
+  }
+
+  @Override
+  protected Set<String> getQueuedWork(ReplicationTarget target) {
+    Map<String,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
+    if (null == queuedWorkForPeer) {
+      return Collections.emptySet();
+    }
+
+    String queuedWork = queuedWorkForPeer.get(target.getSourceTableId());
+    if (null == queuedWork) {
+      return Collections.emptySet();
+    } else {
+      return Collections.singleton(queuedWork);
+    }
+  }
+
+  @Override
+  protected void removeQueuedWork(ReplicationTarget target, String queueKey) {
+    Map<String,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
+    if (null == queuedWorkForPeer) {
+      log.warn("removeQueuedWork called when no work was queued for {}", target.getPeerName());
+      return;
+    }
+
+    String queuedWork = queuedWorkForPeer.get(target.getSourceTableId());
+    if (queuedWork.equals(queueKey)) {
+      queuedWorkForPeer.remove(target.getSourceTableId());
+    } else {
+      log.warn("removeQueuedWork called on {} with differing queueKeys, expected {} but was {}", target, queueKey, queuedWork);
+      return;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/34da6fe9/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java
new file mode 100644
index 0000000..931b2a5
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.replication;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Read work records from the replication table, create work entries for other nodes to complete.
+ * <p>
+ * Uses the DistributedWorkQueue to make the work available for any tserver. This approach does not consider the locality of the tabletserver performing the
+ * work in relation to the data being replicated (local HDFS blocks).
+ * <p>
+ * The implementation allows for multiple tservers to concurrently replicate data to peer(s), however it is possible that data for a table is replayed on the
+ * peer in a different order than the master. The {@link SequentialWorkAssigner} should be used if this must be guaranteed at the cost of replication
+ * throughput.
+ */
+public class UnorderedWorkAssigner extends DistributedWorkQueueWorkAssigner {
+  private static final Logger log = LoggerFactory.getLogger(DistributedWorkQueueWorkAssigner.class);
+  private static final String NAME = "DistributedWorkQueue Replication Work Assigner";
+
+  private Set<String> queuedWork;
+
+  public UnorderedWorkAssigner() {}
+
+  public UnorderedWorkAssigner(AccumuloConfiguration conf, Connector conn) {
+    configure(conf, conn);
+  }
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  protected Set<String> getQueuedWork() {
+    return queuedWork;
+  }
+
+  protected void setQueuedWork(Set<String> queuedWork) {
+    this.queuedWork = queuedWork;
+  }
+
+  /**
+   * Initialize the queuedWork set with the work already sent out
+   */
+  protected void initializeQueuedWork() {
+    if (null != queuedWork) {
+      return;
+    }
+
+    queuedWork = new HashSet<>();
+    while (true) {
+      try {
+        queuedWork.addAll(workQueue.getWorkQueued());
+        return;
+      } catch (KeeperException e) {
+        if (KeeperException.Code.NONODE.equals(e.code())) {
+          log.warn("Could not find ZK root for replication work queue, will retry", e);
+          UtilWaitThread.sleep(500);
+          continue;
+        }
+
+        log.error("Error reading existing queued replication work from ZooKeeper", e);
+        throw new RuntimeException("Error reading existing queued replication work from ZooKeeper", e);
+      } catch (InterruptedException e) {
+        log.error("Error reading existing queued replication work from ZooKeeper", e);
+        throw new RuntimeException("Error reading existing queued replication work from ZooKeeper", e);
+      }
+    }
+  }
+
+  /**
+   * Distribute the work for the given path with filename
+   * 
+   * @param path
+   *          Path to the file being replicated
+   * @param target
+   *          Target for the file to be replicated to
+   */
+  @Override
+  protected boolean queueWork(Path path, ReplicationTarget target) {
+    String queueKey = getQueueKey(path.getName(), target);
+    if (queuedWork.contains(queueKey)) {
+      return false;
+    }
+
+    try {
+      log.debug("Queued work for {} and {}", queueKey, path);
+      workQueue.addWork(queueKey, path.toString());
+      queuedWork.add(queueKey);
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Could not queue work for {}", path, e);
+      return false;
+    }
+
+    return true;
+  }
+
+  /**
+   * Iterate over the queued work to remove entries that have been completed.
+   */
+  @Override
+  protected void cleanupFinishedWork() {
+    final Iterator<String> work = queuedWork.iterator();
+    final String instanceId = conn.getInstance().getInstanceID();
+    while (work.hasNext()) {
+      String filename = work.next();
+      // Null equates to the work was finished
+      if (null == zooCache.get(ZooUtil.getRoot(instanceId) + Constants.ZREPLICATION_WORK_QUEUE + "/" + filename)) {
+        work.remove();
+      }
+    }
+  }
+
+  @Override
+  protected boolean shouldQueueWork(ReplicationTarget target) {
+    // We don't care about ordering, just replicate it all
+    return true;
+  }
+
+  @Override
+  protected int getQueueSize() {
+    return this.queuedWork.size();
+  }
+
+  @Override
+  protected Set<String> getQueuedWork(ReplicationTarget target) {
+    String desiredQueueKeySuffix = KEY_SEPARATOR + target.getPeerName() + KEY_SEPARATOR + target.getRemoteIdentifier() + KEY_SEPARATOR
+        + target.getSourceTableId();
+    Set<String> queuedWorkForTarget = new HashSet<>();
+    for (String queuedWork : this.queuedWork) {
+      if (queuedWork.endsWith(desiredQueueKeySuffix)) {
+        queuedWorkForTarget.add(queuedWork);
+      }
+    }
+
+    return queuedWorkForTarget;
+  }
+
+  @Override
+  protected void removeQueuedWork(ReplicationTarget target, String queueKey) {
+    this.queuedWork.remove(queueKey);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/34da6fe9/server/master/src/test/java/org/apache/accumulo/master/replication/AbstractWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/AbstractWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/AbstractWorkAssignerTest.java
index d0f1f3f..655c81a 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/AbstractWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/AbstractWorkAssignerTest.java
@@ -20,7 +20,6 @@ import java.util.Map.Entry;
 import java.util.UUID;
 
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.server.replication.AbstractWorkAssigner;
 import org.apache.hadoop.fs.Path;
 import org.apache.zookeeper.common.PathUtils;
 import org.junit.Assert;
@@ -36,7 +35,7 @@ public class AbstractWorkAssignerTest {
     Path p = new Path("/accumulo/wals/tserver+port/" + UUID.randomUUID().toString());
     ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
 
-    String key = AbstractWorkAssigner.getQueueKey(p.toString(), target);
+    String key = DistributedWorkQueueWorkAssigner.getQueueKey(p.toString(), target);
 
     PathUtils.validatePath(key);
   }
@@ -46,9 +45,9 @@ public class AbstractWorkAssignerTest {
     Path p = new Path("/accumulo/wals/tserver+port/" + UUID.randomUUID().toString());
     ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
 
-    String key = AbstractWorkAssigner.getQueueKey(p.toString(), target);
+    String key = DistributedWorkQueueWorkAssigner.getQueueKey(p.toString(), target);
 
-    Entry<String,ReplicationTarget> result = AbstractWorkAssigner.fromQueueKey(key);
+    Entry<String,ReplicationTarget> result = DistributedWorkQueueWorkAssigner.fromQueueKey(key);
     Assert.assertEquals(p.toString(), result.getKey());
     Assert.assertEquals(target, result.getValue());
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/34da6fe9/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerTest.java
deleted file mode 100644
index ddd4810..0000000
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerTest.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.master.replication;
-
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
-import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.security.Credentials;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.server.replication.AbstractWorkAssigner;
-import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
-import org.apache.accumulo.server.zookeeper.ZooCache;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-public class DistributedWorkQueueWorkAssignerTest {
-
-  @Rule
-  public TestName test = new TestName();
-
-  private AccumuloConfiguration conf;
-  private Connector conn;
-  private DistributedWorkQueueWorkAssigner assigner;
-
-  @Before
-  public void init() {
-    conf = createMock(AccumuloConfiguration.class);
-    conn = createMock(Connector.class);
-    assigner = new DistributedWorkQueueWorkAssigner(conf, conn);
-  }
-
-  @Test
-  public void workQueuedUsingFileName() throws Exception {
-    ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
-    Text serializedTarget = target.toText();
-
-    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
-    Set<String> queuedWork = new HashSet<>();
-    assigner.setQueuedWork(queuedWork);
-    assigner.setWorkQueue(workQueue);
-
-    Path p = new Path("/accumulo/wal/tserver+port/" + UUID.randomUUID());
-
-    workQueue.addWork(p.getName() + "|" + serializedTarget.toString(), p.toString());
-    expectLastCall().once();
-
-    replay(workQueue);
-
-    assigner.queueWork(p.getName() + "|" + serializedTarget, p.toString());
-
-    Assert.assertEquals(1, queuedWork.size());
-    Assert.assertEquals(p.getName() + "|" + serializedTarget, queuedWork.iterator().next());
-  }
-
-  @Test
-  public void existingWorkIsReQueued() throws Exception {
-    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
-
-    List<String> existingWork = Arrays.asList("/accumulo/wal/tserver+port/wal1", "/accumulo/wal/tserver+port/wal2");
-    expect(workQueue.getWorkQueued()).andReturn(existingWork);
-
-    replay(workQueue);
-
-    assigner.setWorkQueue(workQueue);
-    assigner.initializeQueuedWork();
-
-    verify(workQueue);
-
-    Set<String> queuedWork = assigner.getQueuedWork();
-    Assert.assertEquals("Expected existing work and queued work to be the same size", existingWork.size(), queuedWork.size());
-    Assert.assertTrue("Expected all existing work to be queued", queuedWork.containsAll(existingWork));
-  }
-
-  @Test
-  public void createWorkForFilesNeedingIt() throws Exception {
-    ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1"), target2 = new ReplicationTarget("cluster1", "table2", "2");
-    Text serializedTarget1 = target1.toText(), serializedTarget2 = target2.toText();
-    String keyTarget1 = target1.getPeerName() + AbstractWorkAssigner.KEY_SEPARATOR + target1.getRemoteIdentifier()
-        + AbstractWorkAssigner.KEY_SEPARATOR + target1.getSourceTableId(), keyTarget2 = target2.getPeerName()
-        + AbstractWorkAssigner.KEY_SEPARATOR + target2.getRemoteIdentifier() + AbstractWorkAssigner.KEY_SEPARATOR
-        + target2.getSourceTableId();
-
-    MockInstance inst = new MockInstance(test.getMethodName());
-    Credentials creds = new Credentials("root", new PasswordToken(""));
-    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
-
-    // Set the connector
-    assigner.setConnector(conn);
-
-    // Create and grant ourselves write to the replication table
-    ReplicationTable.create(conn);
-    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
-
-    // Create two mutations, both of which need replication work done
-    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
-    String filename1 = UUID.randomUUID().toString(), filename2 = UUID.randomUUID().toString();
-    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
-    Mutation m = new Mutation(file1);
-    WorkSection.add(m, serializedTarget1, StatusUtil.openWithUnknownLengthValue());
-    bw.addMutation(m);
-
-    m = new Mutation(file2);
-    WorkSection.add(m, serializedTarget2, StatusUtil.openWithUnknownLengthValue());
-    bw.addMutation(m);
-
-    bw.close();
-
-    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
-    @SuppressWarnings("unchecked")
-    HashSet<String> queuedWork = createMock(HashSet.class);
-    assigner.setQueuedWork(queuedWork);
-    assigner.setWorkQueue(workQueue);
-    assigner.setMaxQueueSize(Integer.MAX_VALUE);
-
-    expect(queuedWork.size()).andReturn(0).anyTimes();
-
-    // Make sure we expect the invocations in the correct order (accumulo is sorted)
-    if (file1.compareTo(file2) <= 0) {
-      String key = filename1 + "|" + keyTarget1;
-      expect(queuedWork.contains(key)).andReturn(false);
-      workQueue.addWork(key, file1);
-      expectLastCall().once();
-      expect(queuedWork.add(key)).andReturn(true).once();
-
-      key = filename2 + "|" + keyTarget2;
-      expect(queuedWork.contains(key)).andReturn(false);
-      workQueue.addWork(key, file2);
-      expectLastCall().once();
-      expect(queuedWork.add(key)).andReturn(true).once();
-    } else {
-      String key = filename2 + "|" + keyTarget2;
-      expect(queuedWork.contains(key)).andReturn(false);
-      workQueue.addWork(key, file2);
-      expectLastCall().once();
-      expect(queuedWork.add(key)).andReturn(true).once();
-
-      key = filename1 + "|" + keyTarget1;
-      expect(queuedWork.contains(key)).andReturn(false);
-      workQueue.addWork(key, file1);
-      expectLastCall().once();
-      expect(queuedWork.add(key)).andReturn(true).once();
-    }
-
-    replay(queuedWork, workQueue);
-
-    assigner.createWork();
-
-    verify(queuedWork, workQueue);
-  }
-
-  @Test
-  public void doNotCreateWorkForFilesNotNeedingIt() throws Exception {
-    ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1"), target2 = new ReplicationTarget("cluster1", "table2", "2");
-    Text serializedTarget1 = target1.toText(), serializedTarget2 = target2.toText();
-
-    MockInstance inst = new MockInstance(test.getMethodName());
-    Credentials creds = new Credentials("root", new PasswordToken(""));
-    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
-
-    // Set the connector
-    assigner.setConnector(conn);
-
-    // Create and grant ourselves write to the replication table
-    ReplicationTable.create(conn);
-    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
-
-    // Create two mutations, both of which need replication work done
-    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
-    String filename1 = UUID.randomUUID().toString(), filename2 = UUID.randomUUID().toString();
-    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
-
-    Mutation m = new Mutation(file1);
-    WorkSection.add(m, serializedTarget1, StatusUtil.fileCreatedValue(5));
-    bw.addMutation(m);
-
-    m = new Mutation(file2);
-    WorkSection.add(m, serializedTarget2, StatusUtil.fileCreatedValue(10));
-    bw.addMutation(m);
-
-    bw.close();
-
-    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
-    @SuppressWarnings("unchecked")
-    HashSet<String> queuedWork = createMock(HashSet.class);
-    assigner.setQueuedWork(queuedWork);
-    assigner.setMaxQueueSize(Integer.MAX_VALUE);
-
-    expect(queuedWork.size()).andReturn(0).times(2);
-
-    replay(queuedWork, workQueue);
-
-    assigner.createWork();
-
-    verify(queuedWork, workQueue);
-  }
-
-  @Test
-  public void workNotInZooKeeperIsCleanedUp() {
-    Set<String> queuedWork = new LinkedHashSet<>(Arrays.asList("wal1", "wal2"));
-    assigner.setQueuedWork(queuedWork);
-
-    Instance inst = createMock(Instance.class);
-    ZooCache cache = createMock(ZooCache.class);
-    assigner.setZooCache(cache);
-
-    expect(conn.getInstance()).andReturn(inst);
-    expect(inst.getInstanceID()).andReturn("id");
-    expect(cache.get(Constants.ZROOT + "/id" + Constants.ZREPLICATION_WORK_QUEUE + "/wal1")).andReturn(null);
-    expect(cache.get(Constants.ZROOT + "/id" + Constants.ZREPLICATION_WORK_QUEUE + "/wal2")).andReturn(null);
-
-    replay(cache, inst, conn);
-
-    assigner.cleanupFinishedWork();
-
-    verify(cache, inst, conn);
-    Assert.assertTrue("Queued work was not emptied", queuedWork.isEmpty());
-  }
-
-  @Test
-  public void workNotReAdded() throws Exception {
-    Set<String> queuedWork = new HashSet<>();
-
-    assigner.setQueuedWork(queuedWork);
-
-    ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
-    String serializedTarget = target.getPeerName() + AbstractWorkAssigner.KEY_SEPARATOR + target.getRemoteIdentifier()
-        + AbstractWorkAssigner.KEY_SEPARATOR + target.getSourceTableId();
-
-    queuedWork.add("wal1|" + serializedTarget.toString());
-
-    MockInstance inst = new MockInstance(test.getMethodName());
-    Credentials creds = new Credentials("root", new PasswordToken(""));
-    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
-
-    // Set the connector
-    assigner.setConnector(conn);
-
-    // Create and grant ourselves write to the replication table
-    ReplicationTable.create(conn);
-    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
-
-    // Create two mutations, both of which need replication work done
-    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
-    String file1 = "/accumulo/wal/tserver+port/wal1";
-    Mutation m = new Mutation(file1);
-    WorkSection.add(m, target.toText(), StatusUtil.openWithUnknownLengthValue());
-    bw.addMutation(m);
-
-    bw.close();
-
-    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
-    assigner.setWorkQueue(workQueue);
-    assigner.setMaxQueueSize(Integer.MAX_VALUE);
-
-    replay(workQueue);
-
-    assigner.createWork();
-
-    verify(workQueue);
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/34da6fe9/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
index dce0aa6..4f56862 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
@@ -42,7 +42,6 @@ import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.server.replication.AbstractWorkAssigner;
 import org.apache.accumulo.server.replication.ReplicationTable;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooCache;
@@ -123,7 +122,7 @@ public class SequentialWorkAssignerTest {
     assigner.setMaxQueueSize(Integer.MAX_VALUE);
 
     // Make sure we expect the invocations in the correct order (accumulo is sorted)
-    workQueue.addWork(AbstractWorkAssigner.getQueueKey(filename1, target), file1);
+    workQueue.addWork(DistributedWorkQueueWorkAssigner.getQueueKey(filename1, target), file1);
     expectLastCall().once();
 
     // file2 is *not* queued because file1 must be replicated first
@@ -139,7 +138,7 @@ public class SequentialWorkAssignerTest {
     Map<String,String> cluster1Work = queuedWork.get("cluster1");
     Assert.assertEquals(1, cluster1Work.size());
     Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId()));
-    Assert.assertEquals(AbstractWorkAssigner.getQueueKey(filename1, target), cluster1Work.get(target.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssigner.getQueueKey(filename1, target), cluster1Work.get(target.getSourceTableId()));
   }
 
   @Test
@@ -196,10 +195,10 @@ public class SequentialWorkAssignerTest {
     assigner.setMaxQueueSize(Integer.MAX_VALUE);
 
     // Make sure we expect the invocations in the correct order (accumulo is sorted)
-    workQueue.addWork(AbstractWorkAssigner.getQueueKey(filename1, target1), file1);
+    workQueue.addWork(DistributedWorkQueueWorkAssigner.getQueueKey(filename1, target1), file1);
     expectLastCall().once();
 
-    workQueue.addWork(AbstractWorkAssigner.getQueueKey(filename2, target2), file2);
+    workQueue.addWork(DistributedWorkQueueWorkAssigner.getQueueKey(filename2, target2), file2);
     expectLastCall().once();
 
     // file2 is *not* queued because file1 must be replicated first
@@ -216,10 +215,10 @@ public class SequentialWorkAssignerTest {
     Map<String,String> cluster1Work = queuedWork.get("cluster1");
     Assert.assertEquals(2, cluster1Work.size());
     Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId()));
-    Assert.assertEquals(AbstractWorkAssigner.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssigner.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId()));
 
     Assert.assertTrue(cluster1Work.containsKey(target2.getSourceTableId()));
-    Assert.assertEquals(AbstractWorkAssigner.getQueueKey(filename2, target2), cluster1Work.get(target2.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssigner.getQueueKey(filename2, target2), cluster1Work.get(target2.getSourceTableId()));
   }
 
   @Test
@@ -276,10 +275,10 @@ public class SequentialWorkAssignerTest {
     assigner.setMaxQueueSize(Integer.MAX_VALUE);
 
     // Make sure we expect the invocations in the correct order (accumulo is sorted)
-    workQueue.addWork(AbstractWorkAssigner.getQueueKey(filename1, target1), file1);
+    workQueue.addWork(DistributedWorkQueueWorkAssigner.getQueueKey(filename1, target1), file1);
     expectLastCall().once();
 
-    workQueue.addWork(AbstractWorkAssigner.getQueueKey(filename2, target2), file2);
+    workQueue.addWork(DistributedWorkQueueWorkAssigner.getQueueKey(filename2, target2), file2);
     expectLastCall().once();
 
     // file2 is *not* queued because file1 must be replicated first
@@ -296,12 +295,12 @@ public class SequentialWorkAssignerTest {
     Map<String,String> cluster1Work = queuedWork.get("cluster1");
     Assert.assertEquals(1, cluster1Work.size());
     Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId()));
-    Assert.assertEquals(AbstractWorkAssigner.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssigner.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId()));
 
     Map<String,String> cluster2Work = queuedWork.get("cluster2");
     Assert.assertEquals(1, cluster2Work.size());
     Assert.assertTrue(cluster2Work.containsKey(target2.getSourceTableId()));
-    Assert.assertEquals(AbstractWorkAssigner.getQueueKey(filename2, target2), cluster2Work.get(target2.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssigner.getQueueKey(filename2, target2), cluster2Work.get(target2.getSourceTableId()));
   }
 
   @Test
@@ -314,8 +313,8 @@ public class SequentialWorkAssignerTest {
     Map<String,String> cluster1Work = new TreeMap<>();
 
     // Two files for cluster1, one for table '1' and another for table '2' we havce assigned work for
-    cluster1Work.put("1", AbstractWorkAssigner.getQueueKey("file1", new ReplicationTarget("cluster1", "1", "1")));
-    cluster1Work.put("2", AbstractWorkAssigner.getQueueKey("file2", new ReplicationTarget("cluster1", "2", "2")));
+    cluster1Work.put("1", DistributedWorkQueueWorkAssigner.getQueueKey("file1", new ReplicationTarget("cluster1", "1", "1")));
+    cluster1Work.put("2", DistributedWorkQueueWorkAssigner.getQueueKey("file2", new ReplicationTarget("cluster1", "2", "2")));
 
     queuedWork.put("cluster1", cluster1Work);
 
@@ -330,11 +329,11 @@ public class SequentialWorkAssignerTest {
     // file1 replicated
     expect(
         zooCache.get(ZooUtil.getRoot("instance") + Constants.ZREPLICATION_WORK_QUEUE + "/"
-            + AbstractWorkAssigner.getQueueKey("file1", new ReplicationTarget("cluster1", "1", "1")))).andReturn(null);
+            + DistributedWorkQueueWorkAssigner.getQueueKey("file1", new ReplicationTarget("cluster1", "1", "1")))).andReturn(null);
     // file2 still needs to replicate
     expect(
         zooCache.get(ZooUtil.getRoot("instance") + Constants.ZREPLICATION_WORK_QUEUE + "/"
-            + AbstractWorkAssigner.getQueueKey("file2", new ReplicationTarget("cluster1", "2", "2")))).andReturn(new byte[0]);
+            + DistributedWorkQueueWorkAssigner.getQueueKey("file2", new ReplicationTarget("cluster1", "2", "2")))).andReturn(new byte[0]);
 
     replay(workQueue, zooCache, conn, inst);
 
@@ -343,7 +342,7 @@ public class SequentialWorkAssignerTest {
     verify(workQueue, zooCache, conn, inst);
 
     Assert.assertEquals(1, cluster1Work.size());
-    Assert.assertEquals(AbstractWorkAssigner.getQueueKey("file2", new ReplicationTarget("cluster1", "2", "2")), cluster1Work.get("2"));
+    Assert.assertEquals(DistributedWorkQueueWorkAssigner.getQueueKey("file2", new ReplicationTarget("cluster1", "2", "2")), cluster1Work.get("2"));
   }
 
   @Test
@@ -395,7 +394,7 @@ public class SequentialWorkAssignerTest {
     // Treat filename1 as we have already submitted it for replication
     Map<String,Map<String,String>> queuedWork = new HashMap<>();
     Map<String,String> queuedWorkForCluster = new HashMap<>();
-    queuedWorkForCluster.put(target.getSourceTableId(), AbstractWorkAssigner.getQueueKey(filename1, target));
+    queuedWorkForCluster.put(target.getSourceTableId(), DistributedWorkQueueWorkAssigner.getQueueKey(filename1, target));
     queuedWork.put("cluster1", queuedWorkForCluster);
 
     assigner.setQueuedWork(queuedWork);
@@ -403,10 +402,10 @@ public class SequentialWorkAssignerTest {
     assigner.setMaxQueueSize(Integer.MAX_VALUE);
 
     // Make sure we expect the invocations in the correct order (accumulo is sorted)
-    workQueue.addWork(AbstractWorkAssigner.getQueueKey(filename2, target), file2);
+    workQueue.addWork(DistributedWorkQueueWorkAssigner.getQueueKey(filename2, target), file2);
     expectLastCall().once();
 
-    // file2 is *not* queued because file1 must be replicated first
+    // file2 is queued because we remove file1 because it's fully replicated
 
     replay(workQueue);
 
@@ -419,6 +418,6 @@ public class SequentialWorkAssignerTest {
     Map<String,String> cluster1Work = queuedWork.get("cluster1");
     Assert.assertEquals(1, cluster1Work.size());
     Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId()));
-    Assert.assertEquals(AbstractWorkAssigner.getQueueKey(filename2, target), cluster1Work.get(target.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssigner.getQueueKey(filename2, target), cluster1Work.get(target.getSourceTableId()));
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/34da6fe9/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
new file mode 100644
index 0000000..68a9f5c
--- /dev/null
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.replication;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class UnorderedWorkAssignerTest {
+
+  @Rule
+  public TestName test = new TestName();
+
+  private AccumuloConfiguration conf;
+  private Connector conn;
+  private UnorderedWorkAssigner assigner;
+
+  @Before
+  public void init() {
+    conf = createMock(AccumuloConfiguration.class);
+    conn = createMock(Connector.class);
+    assigner = new UnorderedWorkAssigner(conf, conn);
+  }
+
+  @Test
+  public void workQueuedUsingFileName() throws Exception {
+    ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
+
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+    Set<String> queuedWork = new HashSet<>();
+    assigner.setQueuedWork(queuedWork);
+    assigner.setWorkQueue(workQueue);
+
+    Path p = new Path("/accumulo/wal/tserver+port/" + UUID.randomUUID());
+
+    String expectedQueueKey = p.getName() + DistributedWorkQueueWorkAssigner.KEY_SEPARATOR + target.getPeerName() + DistributedWorkQueueWorkAssigner.KEY_SEPARATOR
+        + target.getRemoteIdentifier() + DistributedWorkQueueWorkAssigner.KEY_SEPARATOR + target.getSourceTableId();
+    
+    workQueue.addWork(expectedQueueKey, p.toString());
+    expectLastCall().once();
+
+    replay(workQueue);
+
+    assigner.queueWork(p, target);
+
+    Assert.assertEquals(1, queuedWork.size());
+    Assert.assertEquals(expectedQueueKey, queuedWork.iterator().next());
+  }
+
+  @Test
+  public void existingWorkIsReQueued() throws Exception {
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+
+    List<String> existingWork = Arrays.asList("/accumulo/wal/tserver+port/wal1", "/accumulo/wal/tserver+port/wal2");
+    expect(workQueue.getWorkQueued()).andReturn(existingWork);
+
+    replay(workQueue);
+
+    assigner.setWorkQueue(workQueue);
+    assigner.initializeQueuedWork();
+
+    verify(workQueue);
+
+    Set<String> queuedWork = assigner.getQueuedWork();
+    Assert.assertEquals("Expected existing work and queued work to be the same size", existingWork.size(), queuedWork.size());
+    Assert.assertTrue("Expected all existing work to be queued", queuedWork.containsAll(existingWork));
+  }
+
+  @Test
+  public void createWorkForFilesNeedingIt() throws Exception {
+    ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1"), target2 = new ReplicationTarget("cluster1", "table2", "2");
+    Text serializedTarget1 = target1.toText(), serializedTarget2 = target2.toText();
+    String keyTarget1 = target1.getPeerName() + DistributedWorkQueueWorkAssigner.KEY_SEPARATOR + target1.getRemoteIdentifier()
+        + DistributedWorkQueueWorkAssigner.KEY_SEPARATOR + target1.getSourceTableId(), keyTarget2 = target2.getPeerName()
+        + DistributedWorkQueueWorkAssigner.KEY_SEPARATOR + target2.getRemoteIdentifier() + DistributedWorkQueueWorkAssigner.KEY_SEPARATOR
+        + target2.getSourceTableId();
+
+    MockInstance inst = new MockInstance(test.getMethodName());
+    Credentials creds = new Credentials("root", new PasswordToken(""));
+    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+
+    // Set the connector
+    assigner.setConnector(conn);
+
+    // Create and grant ourselves write to the replication table
+    ReplicationTable.create(conn);
+    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+
+    Status.Builder builder = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(false).setCreatedTime(5l);
+    Status status1 = builder.build();
+    builder.setCreatedTime(10l);
+    Status status2 = builder.build();
+
+    // Create two mutations, both of which need replication work done
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    String filename1 = UUID.randomUUID().toString(), filename2 = UUID.randomUUID().toString();
+    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
+    Mutation m = new Mutation(file1);
+    WorkSection.add(m, serializedTarget1, ProtobufUtil.toValue(status1));
+    bw.addMutation(m);
+    m = OrderSection.createMutation(file1, status1.getCreatedTime());
+    OrderSection.add(m, new Text(target1.getSourceTableId()), ProtobufUtil.toValue(status1));
+    bw.addMutation(m);
+
+    m = new Mutation(file2);
+    WorkSection.add(m, serializedTarget2, ProtobufUtil.toValue(status2));
+    bw.addMutation(m);
+    m = OrderSection.createMutation(file2, status2.getCreatedTime());
+    OrderSection.add(m, new Text(target2.getSourceTableId()), ProtobufUtil.toValue(status2));
+    bw.addMutation(m);
+
+    bw.close();
+
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+    HashSet<String> queuedWork = new HashSet<>();
+    assigner.setQueuedWork(queuedWork);
+    assigner.setWorkQueue(workQueue);
+    assigner.setMaxQueueSize(Integer.MAX_VALUE);
+
+    // Make sure we expect the invocations in the order they were created
+    String key = filename1 + "|" + keyTarget1;
+    workQueue.addWork(key, file1);
+    expectLastCall().once();
+
+    key = filename2 + "|" + keyTarget2;
+    workQueue.addWork(key, file2);
+    expectLastCall().once();
+
+    replay(workQueue);
+
+    assigner.createWork();
+
+    verify(workQueue);
+  }
+
+  @Test
+  public void doNotCreateWorkForFilesNotNeedingIt() throws Exception {
+    ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1"), target2 = new ReplicationTarget("cluster1", "table2", "2");
+    Text serializedTarget1 = target1.toText(), serializedTarget2 = target2.toText();
+
+    MockInstance inst = new MockInstance(test.getMethodName());
+    Credentials creds = new Credentials("root", new PasswordToken(""));
+    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+
+    // Set the connector
+    assigner.setConnector(conn);
+
+    // Create and grant ourselves write to the replication table
+    ReplicationTable.create(conn);
+    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+
+    // Create two mutations, both of which need replication work done
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    String filename1 = UUID.randomUUID().toString(), filename2 = UUID.randomUUID().toString();
+    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
+
+    Mutation m = new Mutation(file1);
+    WorkSection.add(m, serializedTarget1, StatusUtil.fileCreatedValue(5));
+    bw.addMutation(m);
+
+    m = new Mutation(file2);
+    WorkSection.add(m, serializedTarget2, StatusUtil.fileCreatedValue(10));
+    bw.addMutation(m);
+
+    bw.close();
+
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+    HashSet<String> queuedWork = new HashSet<>();
+    assigner.setQueuedWork(queuedWork);
+    assigner.setMaxQueueSize(Integer.MAX_VALUE);
+
+    replay(workQueue);
+
+    assigner.createWork();
+
+    verify(workQueue);
+  }
+
+  @Test
+  public void workNotInZooKeeperIsCleanedUp() {
+    Set<String> queuedWork = new LinkedHashSet<>(Arrays.asList("wal1", "wal2"));
+    assigner.setQueuedWork(queuedWork);
+
+    Instance inst = createMock(Instance.class);
+    ZooCache cache = createMock(ZooCache.class);
+    assigner.setZooCache(cache);
+
+    expect(conn.getInstance()).andReturn(inst);
+    expect(inst.getInstanceID()).andReturn("id");
+    expect(cache.get(Constants.ZROOT + "/id" + Constants.ZREPLICATION_WORK_QUEUE + "/wal1")).andReturn(null);
+    expect(cache.get(Constants.ZROOT + "/id" + Constants.ZREPLICATION_WORK_QUEUE + "/wal2")).andReturn(null);
+
+    replay(cache, inst, conn);
+
+    assigner.cleanupFinishedWork();
+
+    verify(cache, inst, conn);
+    Assert.assertTrue("Queued work was not emptied", queuedWork.isEmpty());
+  }
+
+  @Test
+  public void workNotReAdded() throws Exception {
+    Set<String> queuedWork = new HashSet<>();
+
+    assigner.setQueuedWork(queuedWork);
+
+    ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
+    String serializedTarget = target.getPeerName() + DistributedWorkQueueWorkAssigner.KEY_SEPARATOR + target.getRemoteIdentifier()
+        + DistributedWorkQueueWorkAssigner.KEY_SEPARATOR + target.getSourceTableId();
+
+    queuedWork.add("wal1|" + serializedTarget.toString());
+
+    MockInstance inst = new MockInstance(test.getMethodName());
+    Credentials creds = new Credentials("root", new PasswordToken(""));
+    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+
+    // Set the connector
+    assigner.setConnector(conn);
+
+    // Create and grant ourselves write to the replication table
+    ReplicationTable.create(conn);
+    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+
+    // Create two mutations, both of which need replication work done
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    String file1 = "/accumulo/wal/tserver+port/wal1";
+    Mutation m = new Mutation(file1);
+    WorkSection.add(m, target.toText(), StatusUtil.openWithUnknownLengthValue());
+    bw.addMutation(m);
+
+    bw.close();
+
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+    assigner.setWorkQueue(workQueue);
+    assigner.setMaxQueueSize(Integer.MAX_VALUE);
+
+    replay(workQueue);
+
+    assigner.createWork();
+
+    verify(workQueue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/34da6fe9/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
index 5e7f255..810757e 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
@@ -49,11 +49,11 @@ import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.master.replication.DistributedWorkQueueWorkAssigner;
 import org.apache.accumulo.monitor.util.Table;
 import org.apache.accumulo.monitor.util.celltypes.NumberType;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.replication.AbstractWorkAssigner;
 import org.apache.accumulo.server.security.SystemCredentials;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooCache;
@@ -212,7 +212,7 @@ public class ReplicationServlet extends BasicServlet {
     DistributedWorkQueue workQueue = new DistributedWorkQueue(workQueuePath, ServerConfiguration.getSystemConfiguration(inst));
 
     for (String queueKey : workQueue.getWorkQueued()) {
-      Entry<String,ReplicationTarget> queueKeyPair = AbstractWorkAssigner.fromQueueKey(queueKey);
+      Entry<String,ReplicationTarget> queueKeyPair = DistributedWorkQueueWorkAssigner.fromQueueKey(queueKey);
       String filename = queueKeyPair.getKey();
       ReplicationTarget target = queueKeyPair.getValue();
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/34da6fe9/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index 3ebcda9..199de91 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@ -37,9 +37,9 @@ import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.master.replication.DistributedWorkQueueWorkAssigner;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.replication.AbstractWorkAssigner;
 import org.apache.accumulo.server.replication.ReplicationTable;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
 import org.apache.hadoop.fs.Path;
@@ -76,7 +76,7 @@ public class ReplicationProcessor implements Processor {
 
   @Override
   public void process(String workID, byte[] data) {
-    ReplicationTarget target = AbstractWorkAssigner.fromQueueKey(workID).getValue();
+    ReplicationTarget target = DistributedWorkQueueWorkAssigner.fromQueueKey(workID).getValue();
     String file = new String(data);
 
     log.debug("Received replication work for {} to {}", file, target);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/34da6fe9/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
index 17d5309..6ed589a 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
@@ -29,8 +29,8 @@ import org.apache.accumulo.core.replication.ReplicaSystemHelper;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.master.replication.DistributedWorkQueueWorkAssigner;
 import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.replication.AbstractWorkAssigner;
 import org.apache.hadoop.fs.Path;
 import org.easymock.EasyMock;
 import org.junit.Assert;
@@ -83,7 +83,7 @@ public class ReplicationProcessorTest {
     Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
     Path path = new Path("/accumulo");
 
-    String queueKey = AbstractWorkAssigner.getQueueKey(path.toString(), target);
+    String queueKey = DistributedWorkQueueWorkAssigner.getQueueKey(path.toString(), target);
 
     EasyMock.expect(proc.getReplicaSystem(target)).andReturn(replica);
     EasyMock.expect(proc.getStatus(path.toString(), target)).andReturn(status);


Mime
View raw message