accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [22/50] [abbrv] git commit: ACCUMULO-2819 Initial pass at a new WorkAssigner that will only replicate the oldest file per table per peer
Date Wed, 21 May 2014 01:59:41 GMT
ACCUMULO-2819 Initial pass at a new WorkAssigner that will only replicate the oldest file per table per peer

By tracking the order in which WALs were MinC'ed, we can use this order to determine
the practical order for which they should be replayed on each table of each peer. This
aims to remove the potential problem that the DWQWorkAssigner faced in that WAL2 might be
replicated before WAL1 (where WAL1 was written before WAL2 locally).


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/a59692dc
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/a59692dc
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/a59692dc

Branch: refs/heads/ACCUMULO-378
Commit: a59692dc62170e4bc3a47d7cf881be9ba6027220
Parents: 8921e32
Author: Josh Elser <elserj@apache.org>
Authored: Fri May 16 18:13:22 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Fri May 16 18:13:22 2014 -0400

----------------------------------------------------------------------
 .../replication/AbstractWorkAssigner.java       |  81 +++++
 .../ReplicationWorkAssignerHelper.java          |  75 -----
 .../server/replication/WorkAssigner.java        |  42 +++
 .../ReplicationWorkAssignerHelperTest.java      |  56 ----
 .../master/replication/AbsractWorkAssigner.java |  30 --
 .../DistributedWorkQueueWorkAssigner.java       |   6 +-
 .../replication/SequentialWorkAssigner.java     | 323 ++++++++++++++++++-
 .../master/replication/WorkAssigner.java        |  42 ---
 .../accumulo/master/replication/WorkDriver.java |   1 +
 .../replication/AbstractWorkAssignerTest.java   |  56 ++++
 .../DistributedWorkQueueWorkAssignerTest.java   |  73 ++++-
 .../replication/SequentialWorkAssignerTest.java | 142 ++++++++
 .../replication/ReplicationProcessor.java       |   4 +-
 13 files changed, 714 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/a59692dc/server/base/src/main/java/org/apache/accumulo/server/replication/AbstractWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/AbstractWorkAssigner.java b/server/base/src/main/java/org/apache/accumulo/server/replication/AbstractWorkAssigner.java
new file mode 100644
index 0000000..2a7b825
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/AbstractWorkAssigner.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.replication;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+/**
+ * Common methods for {@link WorkAssigner}s
+ */
+public abstract class AbstractWorkAssigner implements WorkAssigner {
+
+  protected boolean isWorkRequired(Status status) {
+    return StatusUtil.isWorkRequired(status);
+  }
+  public static final String KEY_SEPARATOR = "|";
+
+  /**
+   * Serialize a filename and a {@link ReplicationTarget} into the expected key format for use with the {@link DistributedWorkQueue}
+   * 
+   * @param filename
+   *          Filename for data to be replicated
+   * @param replTarget
+   *          Information about replication peer
+   * @return Key for identifying work in queue
+   */
+  public static String getQueueKey(String filename, ReplicationTarget replTarget) {
+    return filename + KEY_SEPARATOR + replTarget.getPeerName() + KEY_SEPARATOR + replTarget.getRemoteIdentifier() + KEY_SEPARATOR
+        + replTarget.getSourceTableId();
+  }
+
+  /**
+   * @param queueKey
+   *          Key from the work queue
+   * @return Components which created the queue key
+   */
+  public static Entry<String,ReplicationTarget> fromQueueKey(String queueKey) {
+    Preconditions.checkNotNull(queueKey);
+
+    int index = queueKey.indexOf(KEY_SEPARATOR);
+    if (-1 == index) {
+      throw new IllegalArgumentException("Could not find expected separator in queue key '" + queueKey + "'");
+    }
+
+    String filename = queueKey.substring(0, index);
+
+    int secondIndex = queueKey.indexOf(KEY_SEPARATOR, index + 1);
+    if (-1 == secondIndex) {
+      throw new IllegalArgumentException("Could not find expected separator in queue key '" + queueKey + "'");
+    }
+
+    int thirdIndex = queueKey.indexOf(KEY_SEPARATOR, secondIndex + 1);
+    if (-1 == thirdIndex) {
+      throw new IllegalArgumentException("Could not find expected seperator in queue key '" + queueKey + "'");
+    }
+
+    return Maps.immutableEntry(filename, new ReplicationTarget(queueKey.substring(index + 1, secondIndex), queueKey.substring(secondIndex + 1, thirdIndex),
+        queueKey.substring(thirdIndex + 1)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a59692dc/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationWorkAssignerHelper.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationWorkAssignerHelper.java b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationWorkAssignerHelper.java
deleted file mode 100644
index 13ed4a9..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationWorkAssignerHelper.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.replication;
-
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-/**
- * 
- */
-public class ReplicationWorkAssignerHelper {
-  public static final String KEY_SEPARATOR = "|";
-
-  /**
-   * Serialize a filename and a {@link ReplicationTarget} into the expected key format for use with the {@link DistributedWorkQueue}
-   * 
-   * @param filename
-   *          Filename for data to be replicated
-   * @param replTarget
-   *          Information about replication peer
-   * @return Key for identifying work in queue
-   */
-  public static String getQueueKey(String filename, ReplicationTarget replTarget) {
-    return filename + KEY_SEPARATOR + replTarget.getPeerName() + KEY_SEPARATOR + replTarget.getRemoteIdentifier() + KEY_SEPARATOR
-        + replTarget.getSourceTableId();
-  }
-
-  /**
-   * @param queueKey
-   *          Key from the work queue
-   * @return Components which created the queue key
-   */
-  public static Entry<String,ReplicationTarget> fromQueueKey(String queueKey) {
-    Preconditions.checkNotNull(queueKey);
-
-    int index = queueKey.indexOf(KEY_SEPARATOR);
-    if (-1 == index) {
-      throw new IllegalArgumentException("Could not find expected separator in queue key '" + queueKey + "'");
-    }
-
-    String filename = queueKey.substring(0, index);
-
-    int secondIndex = queueKey.indexOf(KEY_SEPARATOR, index + 1);
-    if (-1 == secondIndex) {
-      throw new IllegalArgumentException("Could not find expected separator in queue key '" + queueKey + "'");
-    }
-
-    int thirdIndex = queueKey.indexOf(KEY_SEPARATOR, secondIndex + 1);
-    if (-1 == thirdIndex) {
-      throw new IllegalArgumentException("Could not find expected seperator in queue key '" + queueKey + "'");
-    }
-
-    return Maps.immutableEntry(filename, new ReplicationTarget(queueKey.substring(index + 1, secondIndex), queueKey.substring(secondIndex + 1, thirdIndex),
-        queueKey.substring(thirdIndex + 1)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a59692dc/server/base/src/main/java/org/apache/accumulo/server/replication/WorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/WorkAssigner.java b/server/base/src/main/java/org/apache/accumulo/server/replication/WorkAssigner.java
new file mode 100644
index 0000000..1069835
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/WorkAssigner.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.replication;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+
+
+/**
+ * Interface to allow for multiple implementations that assign replication work
+ */
+public interface WorkAssigner {
+
+  /**
+   * @return The name for this WorkAssigner
+   */
+  public String getName();
+
+  /**
+   * Configure the WorkAssigner implementation
+   */
+  public void configure(AccumuloConfiguration conf, Connector conn);
+
+  /**
+   * Assign work for replication
+   */
+  public void assignWork();
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a59692dc/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationWorkAssignerHelperTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationWorkAssignerHelperTest.java b/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationWorkAssignerHelperTest.java
deleted file mode 100644
index 47a4a0d..0000000
--- a/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationWorkAssignerHelperTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.replication;
-
-import java.util.Map.Entry;
-import java.util.UUID;
-
-import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.server.replication.ReplicationWorkAssignerHelper;
-import org.apache.hadoop.fs.Path;
-import org.apache.zookeeper.common.PathUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * 
- */
-public class ReplicationWorkAssignerHelperTest {
-
-  @Test
-  public void createsValidZKNodeName() {
-    Path p = new Path ("/accumulo/wals/tserver+port/" + UUID.randomUUID().toString());
-    ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
-
-    String key = ReplicationWorkAssignerHelper.getQueueKey(p.toString(), target);
-    
-    PathUtils.validatePath(key);
-  }
-
-  @Test
-  public void queueKeySerialization() {
-    Path p = new Path("/accumulo/wals/tserver+port/" + UUID.randomUUID().toString());
-    ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
-
-    String key = ReplicationWorkAssignerHelper.getQueueKey(p.toString(), target);
-
-    Entry<String,ReplicationTarget> result = ReplicationWorkAssignerHelper.fromQueueKey(key);
-    Assert.assertEquals(p.toString(), result.getKey());
-    Assert.assertEquals(target, result.getValue());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a59692dc/server/master/src/main/java/org/apache/accumulo/master/replication/AbsractWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/AbsractWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/AbsractWorkAssigner.java
deleted file mode 100644
index eea6853..0000000
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/AbsractWorkAssigner.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.master.replication;
-
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-
-/**
- * Common methods for {@link WorkAssigner}s
- */
-public abstract class AbsractWorkAssigner implements WorkAssigner {
-
-  protected boolean isWorkRequired(Status status) {
-    return StatusUtil.isWorkRequired(status);
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a59692dc/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
index a84e6a0..e97f3ca 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
@@ -37,8 +37,8 @@ import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.replication.AbstractWorkAssigner;
 import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.server.replication.ReplicationWorkAssignerHelper;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.fs.Path;
@@ -61,7 +61,7 @@ import com.google.protobuf.TextFormat;
  * peer in a different order than the master. The {@link SequentialWorkAssigner} should be used if this must be guaranteed at the cost of replication
  * throughput.
  */
-public class DistributedWorkQueueWorkAssigner extends AbsractWorkAssigner {
+public class DistributedWorkQueueWorkAssigner extends AbstractWorkAssigner {
   private static final Logger log = LoggerFactory.getLogger(DistributedWorkQueueWorkAssigner.class);
   private static final String NAME = "DistributedWorkQueue Replication Work Assigner";
 
@@ -230,7 +230,7 @@ public class DistributedWorkQueueWorkAssigner extends AbsractWorkAssigner {
           Path p = new Path(file);
           String filename = p.getName();
           WorkSection.getTarget(entry.getKey(), buffer);
-          String key = ReplicationWorkAssignerHelper.getQueueKey(filename, ReplicationTarget.from(buffer));
+          String key = getQueueKey(filename, ReplicationTarget.from(buffer));
 
           // And, we haven't already queued this file up for work already
           if (!queuedWork.contains(key)) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a59692dc/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
index 8058a2b..e295ef7 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
@@ -16,29 +16,336 @@
  */
 package org.apache.accumulo.master.replication;
 
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.replication.AbstractWorkAssigner;
+import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
- * 
+ * Creates work in ZK which is <code>filename.serialized_ReplicationTarget => filename</code>
  */
-public class SequentialWorkAssigner extends AbsractWorkAssigner {
+public class SequentialWorkAssigner extends AbstractWorkAssigner {
+  private static final Logger log = LoggerFactory.getLogger(SequentialWorkAssigner.class);
+  private static final String NAME = "Sequential Work Assigner";
 
-  @Override
-  public String getName() {
-    // TODO Auto-generated method stub
-    return null;
+  private Connector conn;
+  private AccumuloConfiguration conf;
+
+  // @formatter.off
+  /*
+   * { 
+   *    peer1 => {sourceTableId1 => work_queue_key1, sourceTableId2 => work_queue_key2, ...}
+   *    peer2 => {sourceTableId1 => work_queue_key1, sourceTableId3 => work_queue_key4, ...}
+   *    ...
+   * }
+   */
+  // @formatter.on
+  private Map<String,Map<String,String>> queuedWorkByPeerName;
+
+  private DistributedWorkQueue workQueue;
+  private int maxQueueSize;
+  private ZooCache zooCache;
+
+  public SequentialWorkAssigner() {}
+
+  public SequentialWorkAssigner(AccumuloConfiguration conf, Connector conn) {
+    this.conf = conf;
+    this.conn = conn;
   }
 
   @Override
   public void configure(AccumuloConfiguration conf, Connector conn) {
-    // TODO Auto-generated method stub
+    this.conf = conf;
+    this.conn = conn;
+  }
+
+  @Override
+  public String getName() {
+    return NAME;
   }
 
   @Override
   public void assignWork() {
-    // TODO Auto-generated method stub
+    if (null == workQueue) {
+      initializeWorkQueue(conf);
+    }
+
+    if (null == queuedWorkByPeerName) {
+      initializeQueuedWork();
+    }
+
+    if (null == zooCache) {
+      zooCache = new ZooCache();
+    }
+
+    // Get the maximum number of entries we want to queue work for (or the default)
+    this.maxQueueSize = conf.getCount(Property.REPLICATION_MAX_WORK_QUEUE);
+
+    // Scan over the work records, adding the work to the queue
+    createWork();
+
+    // Keep the state of the work we queued correct
+    cleanupFinishedWork();
+  }
+
+  /*
+   * Getters/setters for testing purposes
+   */
+  protected Connector getConnector() {
+    return conn;
+  }
+
+  protected void setConnector(Connector conn) {
+    this.conn = conn;
+  }
+
+  protected AccumuloConfiguration getConf() {
+    return conf;
+  }
+
+  protected void setConf(AccumuloConfiguration conf) {
+    this.conf = conf;
+  }
+
+  protected DistributedWorkQueue getWorkQueue() {
+    return workQueue;
+  }
+
+  protected void setWorkQueue(DistributedWorkQueue workQueue) {
+    this.workQueue = workQueue;
+  }
+
+  protected Map<String,Map<String,String>> getQueuedWork() {
+    return queuedWorkByPeerName;
+  }
+
+  protected void setQueuedWork(Map<String,Map<String,String>> queuedWork) {
+    this.queuedWorkByPeerName = queuedWork;
+  }
+
+  protected int getMaxQueueSize() {
+    return maxQueueSize;
+  }
 
+  protected void setMaxQueueSize(int maxQueueSize) {
+    this.maxQueueSize = maxQueueSize;
   }
 
+  protected ZooCache getZooCache() {
+    return zooCache;
+  }
+
+  protected void setZooCache(ZooCache zooCache) {
+    this.zooCache = zooCache;
+  }
+
+  /**
+   * Initialize the DistributedWorkQueue using the proper ZK location
+   * 
+   * @param conf
+   */
+  protected void initializeWorkQueue(AccumuloConfiguration conf) {
+    workQueue = new DistributedWorkQueue(ZooUtil.getRoot(conn.getInstance()) + Constants.ZREPLICATION_WORK_QUEUE, conf);
+  }
+
+  /**
+   * Initialize the queuedWork set with the work already sent out
+   */
+  protected void initializeQueuedWork() {
+    Preconditions.checkArgument(null == queuedWorkByPeerName, "Expected queuedWork to be null");
+    queuedWorkByPeerName = new HashMap<>();
+    List<String> existingWork;
+    try {
+      existingWork = workQueue.getWorkQueued();
+    } catch (KeeperException | InterruptedException e) {
+      throw new RuntimeException("Error reading existing queued replication work", e);
+    }
+
+    log.info("Restoring replication work queue state from zookeeper");
+
+    for (String work : existingWork) {
+      Entry<String,ReplicationTarget> entry = fromQueueKey(work);
+      String filename = entry.getKey();
+      String peerName = entry.getValue().getPeerName();
+      String sourceTableId = entry.getValue().getSourceTableId();
+
+      log.debug("In progress replication of {} from table with ID {} to peer {}", filename, sourceTableId, peerName);
+
+      Map<String,String> replicationForPeer = queuedWorkByPeerName.get(peerName);
+      if (null == replicationForPeer) {
+        replicationForPeer = new HashMap<>();
+        queuedWorkByPeerName.put(sourceTableId, replicationForPeer);
+      }
+
+      replicationForPeer.put(sourceTableId, work);
+    }
+  }
+
+  /**
+   * Scan over the {@link WorkSection} of the replication table adding work for entries that have data to replicate and have not already been queued.
+   */
+  protected void createWork() {
+    // Create a scanner over the replication table's order entries
+    Scanner s;
+    try {
+      s = ReplicationTable.getScanner(conn);
+    } catch (TableNotFoundException e) {
+      UtilWaitThread.sleep(1000);
+      return;
+    }
+
+    OrderSection.limit(s);
+
+    Text buffer = new Text();
+    for (Entry<Key,Value> entry : s) {
+      // If we're not working off the entries, we need to not shoot ourselves in the foot by continuing
+      // to add more work entries
+      if (queuedWorkByPeerName.size() > maxQueueSize) {
+        log.warn("Queued replication work exceeds configured maximum ({}), sleeping to allow work to occur", maxQueueSize);
+        UtilWaitThread.sleep(5000);
+        return;
+      }
+
+      String file = OrderSection.getFile(entry.getKey(), buffer);
+      OrderSection.getTableId(entry.getKey(), buffer);
+      String sourceTableId = buffer.toString();
+
+      Scanner workScanner;
+      try {
+        workScanner = ReplicationTable.getScanner(conn);
+      } catch (TableNotFoundException e) {
+        log.warn("Replication table was deleted. Will retry...");
+        UtilWaitThread.sleep(5000);
+        return;
+      }
+
+      WorkSection.limit(workScanner);
+      workScanner.setRange(Range.exact(file));
+
+      int newReplicationTasksSubmitted = 0;
+      // For a file, we can concurrently replicate it to multiple targets
+      for (Entry<Key,Value> workEntry : workScanner) {
+        Status status;
+        try {
+          status = StatusUtil.fromValue(workEntry.getValue());
+        } catch (InvalidProtocolBufferException e) {
+          log.warn("Could not deserialize protobuf from work entry for {} to {}, will retry", file,
+              ReplicationTarget.from(workEntry.getKey().getColumnQualifier()), e);
+          continue;
+        }
+
+        // Get the ReplicationTarget for this Work record
+        ReplicationTarget target = WorkSection.getTarget(entry.getKey(), buffer);
+
+        Map<String,String> queuedWorkForPeer = queuedWorkByPeerName.get(target.getPeerName());
+        if (null == queuedWorkForPeer) {
+          queuedWorkForPeer = new HashMap<>();
+          queuedWorkByPeerName.put(target.getPeerName(), queuedWorkForPeer);
+        }
+
+        // If there is work to do
+        if (isWorkRequired(status)) {
+          Path p = new Path(file);
+          String filename = p.getName();
+          String key = getQueueKey(filename, target);
+
+          // Get the file (if any) currently being replicated to the given peer for the given source table
+          String fileBeingReplicated = queuedWorkForPeer.get(sourceTableId);
+
+          if (null == fileBeingReplicated) {
+            // If there is no file, submit this one for replication
+            newReplicationTasksSubmitted += queueWork(key, file, sourceTableId, queuedWorkForPeer);
+          } else {
+            log.debug("Not queueing {} for work as {} must be replicated to {} first", file, fileBeingReplicated, target.getPeerName());
+          }
+        } else {
+          log.debug("Not queueing work for {} because [{}] doesn't need replication", file, ProtobufUtil.toString(status));
+        }
+      }
+
+      log.info("Assigned {} replication work entries for {}", newReplicationTasksSubmitted, file);
+    }
+  }
+
+  /**
+   * Distribute the work for the given path with filename
+   * 
+   * @param key
+   *          Unique key to identify this work in the queue
+   * @param path
+   *          Full path to a file
+   */
+  protected int queueWork(String key, String path, String sourceTableId, Map<String,String> queuedWorkForPeer) {
+    try {
+      log.debug("Queued work for {} and {} from table ID {}", key, path, sourceTableId);
+
+      workQueue.addWork(key, path);      
+      queuedWorkForPeer.put(sourceTableId, key);
+
+      return 1;
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Could not queue work for {}", path, e);
+      return 0;
+    }
+  }
+
+  /**
+   * Iterate over the queued work to remove entries that have been completed.
+   */
+  protected void cleanupFinishedWork() {
+    final Iterator<Entry<String,Map<String,String>>> queuedWork = queuedWorkByPeerName.entrySet().iterator();
+    final String instanceId = conn.getInstance().getInstanceID();
+
+    // Check the status of all the work we've queued up
+    while (queuedWork.hasNext()) {
+      // {peer -> {tableId -> workKey, tableId -> workKey, ... }, peer -> ...}
+      Entry<String,Map<String,String>> workForPeer = queuedWork.next();
+
+      // TableID to workKey (filename and ReplicationTarget)
+      Map<String,String> queuedReplication = workForPeer.getValue();
+
+      Iterator<Entry<String,String>> iter = queuedReplication.entrySet().iterator();
+      // Loop over every target we need to replicate this file to, removing the target when
+      // the replication task has finished
+      while (iter.hasNext()) {
+        // tableID -> workKey
+        Entry<String,String> entry = iter.next();
+        // Null equates to the work for this target was finished
+        if (null == zooCache.get(ZooUtil.getRoot(instanceId) + Constants.ZREPLICATION_WORK_QUEUE + "/" + entry.getValue())) {
+          iter.remove();
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a59692dc/server/master/src/main/java/org/apache/accumulo/master/replication/WorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkAssigner.java
deleted file mode 100644
index 9e47fc3..0000000
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkAssigner.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.master.replication;
-
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-
-
-/**
- * Interface to allow for multiple implementations that assign replication work
- */
-public interface WorkAssigner {
-
-  /**
-   * @return The name for this WorkAssigner
-   */
-  public String getName();
-
-  /**
-   * Configure the WorkAssigner implementation
-   */
-  public void configure(AccumuloConfiguration conf, Connector conn);
-
-  /**
-   * Assign work for replication
-   */
-  public void assignWork();
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a59692dc/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
index bbe8f93..1b70a13 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
@@ -22,6 +22,7 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.replication.WorkAssigner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a59692dc/server/master/src/test/java/org/apache/accumulo/master/replication/AbstractWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/AbstractWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/AbstractWorkAssignerTest.java
new file mode 100644
index 0000000..f4e60e5
--- /dev/null
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/AbstractWorkAssignerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.replication;
+
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.server.replication.AbstractWorkAssigner;
+import org.apache.hadoop.fs.Path;
+import org.apache.zookeeper.common.PathUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class AbstractWorkAssignerTest {
+
+  @Test
+  public void createsValidZKNodeName() {
+    Path p = new Path ("/accumulo/wals/tserver+port/" + UUID.randomUUID().toString());
+    ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
+
+    String key = AbstractWorkAssigner.getQueueKey(p.toString(), target);
+    
+    PathUtils.validatePath(key);
+  }
+
+  @Test
+  public void queueKeySerialization() {
+    Path p = new Path("/accumulo/wals/tserver+port/" + UUID.randomUUID().toString());
+    ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
+
+    String key = AbstractWorkAssigner.getQueueKey(p.toString(), target);
+
+    Entry<String,ReplicationTarget> result = AbstractWorkAssigner.fromQueueKey(key);
+    Assert.assertEquals(p.toString(), result.getKey());
+    Assert.assertEquals(target, result.getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a59692dc/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerTest.java
index 450b426..2048195 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerTest.java
@@ -37,13 +37,15 @@ import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.server.replication.ReplicationWorkAssignerHelper;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.fs.Path;
@@ -299,4 +301,73 @@ public class DistributedWorkQueueWorkAssignerTest {
 
     verify(workQueue);
   }
+
+  @Test
+  public void createWorkForFilesInCorrectOrder() throws Exception {
+    ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
+    Text serializedTarget = target.toText();
+    String keyTarget = target.getPeerName() + ReplicationWorkAssignerHelper.KEY_SEPARATOR + target.getRemoteIdentifier()
+        + ReplicationWorkAssignerHelper.KEY_SEPARATOR + target.getSourceTableId();
+
+    MockInstance inst = new MockInstance(test.getMethodName());
+    Credentials creds = new Credentials("root", new PasswordToken(""));
+    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+
+    // Set the connector
+    assigner.setConnector(conn);
+
+    // Create and grant ourselves write to the replication table
+    ReplicationTable.create(conn);
+    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+
+    // Create two mutations, both of which need replication work done
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    // We want the name of file2 to sort before file1
+    String filename1 = "z_file1", filename2 = "a_file1";
+    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
+
+    // File1 was closed before file2, however
+    Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setClosedTime(250).build();
+    Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setClosedTime(500).build();
+
+    Mutation m = new Mutation(file1);
+    WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat1));
+    bw.addMutation(m);
+
+    m = new Mutation(file2);
+    WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat2));
+    bw.addMutation(m);
+
+    m = OrderSection.createMutation(file1, stat1.getClosedTime());
+    OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat1));
+    bw.addMutation(m);
+
+    m = OrderSection.createMutation(file2, stat2.getClosedTime());
+    OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat2));
+    bw.addMutation(m);
+
+    bw.close();
+
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+    @SuppressWarnings("unchecked")
+    HashSet<String> queuedWork = createMock(HashSet.class);
+    assigner.setQueuedWork(queuedWork);
+    assigner.setWorkQueue(workQueue);
+    assigner.setMaxQueueSize(Integer.MAX_VALUE);
+
+    expect(queuedWork.size()).andReturn(0).anyTimes();
+
+    // Make sure we expect the invocations in the correct order (accumulo is sorted)
+    expect(queuedWork.contains(filename1 + "|" + keyTarget)).andReturn(false);
+    workQueue.addWork(filename1 + "|" + keyTarget, file1);
+    expectLastCall().once();
+
+    // file2 is *not* queued because file1 must be replicated first
+
+    replay(queuedWork, workQueue);
+
+    assigner.createWork();
+
+    verify(queuedWork, workQueue);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a59692dc/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
new file mode 100644
index 0000000..b7c6e83
--- /dev/null
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.replication;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.fail;
+
+import java.util.HashSet;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.server.replication.AbstractWorkAssigner;
+import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * 
+ */
+public class SequentialWorkAssignerTest {
+
+  @Rule
+  public TestName test = new TestName();
+
+  private AccumuloConfiguration conf;
+  private Connector conn;
+  private SequentialWorkAssigner assigner;
+
+  @Before
+  public void init() {
+    conf = createMock(AccumuloConfiguration.class);
+    conn = createMock(Connector.class);
+    assigner = new SequentialWorkAssigner(conf, conn);
+  }
+
+  @Test
+  public void test() {
+    fail("Not yet implemented");
+  }
+
+//  @Test
+  public void createWorkForFilesInCorrectOrder() throws Exception {
+    ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
+    Text serializedTarget = target.toText();
+    String keyTarget = target.getPeerName() + AbstractWorkAssigner.KEY_SEPARATOR + target.getRemoteIdentifier()
+        + AbstractWorkAssigner.KEY_SEPARATOR + target.getSourceTableId();
+
+    MockInstance inst = new MockInstance(test.getMethodName());
+    Credentials creds = new Credentials("root", new PasswordToken(""));
+    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+
+    // Set the connector
+    assigner.setConnector(conn);
+
+    // Create and grant ourselves write to the replication table
+    ReplicationTable.create(conn);
+    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+
+    // Create two mutations, both of which need replication work done
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    // We want the name of file2 to sort before file1
+    String filename1 = "z_file1", filename2 = "a_file1";
+    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
+
+    // File1 was closed before file2, however
+    Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setClosedTime(250).build();
+    Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setClosedTime(500).build();
+
+    Mutation m = new Mutation(file1);
+    WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat1));
+    bw.addMutation(m);
+
+    m = new Mutation(file2);
+    WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat2));
+    bw.addMutation(m);
+
+    m = OrderSection.createMutation(file1, stat1.getClosedTime());
+    OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat1));
+    bw.addMutation(m);
+
+    m = OrderSection.createMutation(file2, stat2.getClosedTime());
+    OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat2));
+    bw.addMutation(m);
+
+    bw.close();
+
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+    @SuppressWarnings("unchecked")
+    HashSet<String> queuedWork = createMock(HashSet.class);
+    assigner.setQueuedWork(queuedWork);
+    assigner.setWorkQueue(workQueue);
+    assigner.setMaxQueueSize(Integer.MAX_VALUE);
+
+    expect(queuedWork.size()).andReturn(0).anyTimes();
+
+    // Make sure we expect the invocations in the correct order (accumulo is sorted)
+    expect(queuedWork.contains(filename1 + "|" + keyTarget)).andReturn(false);
+    workQueue.addWork(filename1 + "|" + keyTarget, file1);
+    expectLastCall().once();
+
+    // file2 is *not* queued because file1 must be replicated first
+
+    replay(queuedWork, workQueue);
+
+    assigner.createWork();
+
+    verify(queuedWork, workQueue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a59692dc/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index 3a0da79..2b8496c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@ -40,8 +40,8 @@ import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.replication.AbstractWorkAssigner;
 import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.server.replication.ReplicationWorkAssignerHelper;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -76,7 +76,7 @@ public class ReplicationProcessor implements Processor {
 
   @Override
   public void process(String workID, byte[] data) {
-    ReplicationTarget target = ReplicationWorkAssignerHelper.fromQueueKey(workID).getValue();
+    ReplicationTarget target = AbstractWorkAssigner.fromQueueKey(workID).getValue();
     String file = new String(data);
 
     log.debug("Received replication work for {} to {}", file, target);


Mime
View raw message