accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [25/50] [abbrv] git commit: ACCUMULO-2819 Fix a small bug, and add some test cases
Date Wed, 21 May 2014 01:59:44 GMT
ACCUMULO-2819 Fix a small bug, and add some test cases


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/28274ae8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/28274ae8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/28274ae8

Branch: refs/heads/ACCUMULO-378
Commit: 28274ae88edc777d5a3267c4febcec2da6b82344
Parents: a59692d
Author: Josh Elser <elserj@apache.org>
Authored: Fri May 16 21:12:55 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Fri May 16 21:12:55 2014 -0400

----------------------------------------------------------------------
 .../replication/SequentialWorkAssigner.java     |   8 +-
 .../DistributedWorkQueueWorkAssignerTest.java   |  83 +------
 .../replication/SequentialWorkAssignerTest.java | 238 +++++++++++++++++--
 3 files changed, 232 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/28274ae8/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
index e295ef7..67b652b 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
@@ -228,7 +228,7 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
     OrderSection.limit(s);
 
     Text buffer = new Text();
-    for (Entry<Key,Value> entry : s) {
+    for (Entry<Key,Value> orderEntry : s) {
       // If we're not working off the entries, we need to not shoot ourselves in the foot
by continuing
       // to add more work entries
       if (queuedWorkByPeerName.size() > maxQueueSize) {
@@ -237,8 +237,8 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
         return;
       }
 
-      String file = OrderSection.getFile(entry.getKey(), buffer);
-      OrderSection.getTableId(entry.getKey(), buffer);
+      String file = OrderSection.getFile(orderEntry.getKey(), buffer);
+      OrderSection.getTableId(orderEntry.getKey(), buffer);
       String sourceTableId = buffer.toString();
 
       Scanner workScanner;
@@ -266,7 +266,7 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
         }
 
         // Get the ReplicationTarget for this Work record
-        ReplicationTarget target = WorkSection.getTarget(entry.getKey(), buffer);
+        ReplicationTarget target = WorkSection.getTarget(workEntry.getKey(), buffer);
 
         Map<String,String> queuedWorkForPeer = queuedWorkByPeerName.get(target.getPeerName());
         if (null == queuedWorkForPeer) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/28274ae8/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerTest.java
b/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerTest.java
index 2048195..46c5691 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerTest.java
@@ -37,14 +37,12 @@ import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.server.replication.AbstractWorkAssigner;
 import org.apache.accumulo.server.replication.ReplicationTable;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooCache;
@@ -118,9 +116,9 @@ public class DistributedWorkQueueWorkAssignerTest {
   public void createWorkForFilesNeedingIt() throws Exception {
     ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1"), target2
= new ReplicationTarget("cluster1", "table2", "2");
     Text serializedTarget1 = target1.toText(), serializedTarget2 = target2.toText();
-    String keyTarget1 = target1.getPeerName() + ReplicationWorkAssignerHelper.KEY_SEPARATOR
+ target1.getRemoteIdentifier()
-        + ReplicationWorkAssignerHelper.KEY_SEPARATOR + target1.getSourceTableId(), keyTarget2
= target2.getPeerName()
-        + ReplicationWorkAssignerHelper.KEY_SEPARATOR + target2.getRemoteIdentifier() + ReplicationWorkAssignerHelper.KEY_SEPARATOR
+    String keyTarget1 = target1.getPeerName() + AbstractWorkAssigner.KEY_SEPARATOR + target1.getRemoteIdentifier()
+        + AbstractWorkAssigner.KEY_SEPARATOR + target1.getSourceTableId(), keyTarget2 = target2.getPeerName()
+        + AbstractWorkAssigner.KEY_SEPARATOR + target2.getRemoteIdentifier() + AbstractWorkAssigner.KEY_SEPARATOR
         + target2.getSourceTableId();
 
     MockInstance inst = new MockInstance(test.getMethodName());
@@ -266,8 +264,8 @@ public class DistributedWorkQueueWorkAssignerTest {
     assigner.setQueuedWork(queuedWork);
 
     ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
-    String serializedTarget = target.getPeerName() + ReplicationWorkAssignerHelper.KEY_SEPARATOR
+ target.getRemoteIdentifier()
-        + ReplicationWorkAssignerHelper.KEY_SEPARATOR + target.getSourceTableId();
+    String serializedTarget = target.getPeerName() + AbstractWorkAssigner.KEY_SEPARATOR +
target.getRemoteIdentifier()
+        + AbstractWorkAssigner.KEY_SEPARATOR + target.getSourceTableId();
 
     queuedWork.add("wal1|" + serializedTarget.toString());
 
@@ -301,73 +299,4 @@ public class DistributedWorkQueueWorkAssignerTest {
 
     verify(workQueue);
   }
-
-  @Test
-  public void createWorkForFilesInCorrectOrder() throws Exception {
-    ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
-    Text serializedTarget = target.toText();
-    String keyTarget = target.getPeerName() + ReplicationWorkAssignerHelper.KEY_SEPARATOR
+ target.getRemoteIdentifier()
-        + ReplicationWorkAssignerHelper.KEY_SEPARATOR + target.getSourceTableId();
-
-    MockInstance inst = new MockInstance(test.getMethodName());
-    Credentials creds = new Credentials("root", new PasswordToken(""));
-    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
-
-    // Set the connector
-    assigner.setConnector(conn);
-
-    // Create and grant ourselves write to the replication table
-    ReplicationTable.create(conn);
-    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
-
-    // Create two mutations, both of which need replication work done
-    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
-    // We want the name of file2 to sort before file1
-    String filename1 = "z_file1", filename2 = "a_file1";
-    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/"
+ filename2;
-
-    // File1 was closed before file2, however
-    Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setClosedTime(250).build();
-    Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setClosedTime(500).build();
-
-    Mutation m = new Mutation(file1);
-    WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat1));
-    bw.addMutation(m);
-
-    m = new Mutation(file2);
-    WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat2));
-    bw.addMutation(m);
-
-    m = OrderSection.createMutation(file1, stat1.getClosedTime());
-    OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat1));
-    bw.addMutation(m);
-
-    m = OrderSection.createMutation(file2, stat2.getClosedTime());
-    OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat2));
-    bw.addMutation(m);
-
-    bw.close();
-
-    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
-    @SuppressWarnings("unchecked")
-    HashSet<String> queuedWork = createMock(HashSet.class);
-    assigner.setQueuedWork(queuedWork);
-    assigner.setWorkQueue(workQueue);
-    assigner.setMaxQueueSize(Integer.MAX_VALUE);
-
-    expect(queuedWork.size()).andReturn(0).anyTimes();
-
-    // Make sure we expect the invocations in the correct order (accumulo is sorted)
-    expect(queuedWork.contains(filename1 + "|" + keyTarget)).andReturn(false);
-    workQueue.addWork(filename1 + "|" + keyTarget, file1);
-    expectLastCall().once();
-
-    // file2 is *not* queued because file1 must be replicated first
-
-    replay(queuedWork, workQueue);
-
-    assigner.createWork();
-
-    verify(queuedWork, workQueue);
-  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/28274ae8/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
index b7c6e83..ebc540f 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
@@ -21,12 +21,15 @@ import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
-import static org.junit.Assert.fail;
 
-import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -38,10 +41,13 @@ import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.replication.AbstractWorkAssigner;
 import org.apache.accumulo.server.replication.ReplicationTable;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.io.Text;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -67,16 +73,9 @@ public class SequentialWorkAssignerTest {
   }
 
   @Test
-  public void test() {
-    fail("Not yet implemented");
-  }
-
-//  @Test
   public void createWorkForFilesInCorrectOrder() throws Exception {
     ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
     Text serializedTarget = target.toText();
-    String keyTarget = target.getPeerName() + AbstractWorkAssigner.KEY_SEPARATOR + target.getRemoteIdentifier()
-        + AbstractWorkAssigner.KEY_SEPARATOR + target.getSourceTableId();
 
     MockInstance inst = new MockInstance(test.getMethodName());
     Credentials creds = new Credentials("root", new PasswordToken(""));
@@ -118,25 +117,232 @@ public class SequentialWorkAssignerTest {
     bw.close();
 
     DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
-    @SuppressWarnings("unchecked")
-    HashSet<String> queuedWork = createMock(HashSet.class);
+    Map<String,Map<String,String>> queuedWork = new HashMap<>();
     assigner.setQueuedWork(queuedWork);
     assigner.setWorkQueue(workQueue);
     assigner.setMaxQueueSize(Integer.MAX_VALUE);
 
-    expect(queuedWork.size()).andReturn(0).anyTimes();
+    // Make sure we expect the invocations in the correct order (accumulo is sorted)
+    workQueue.addWork(AbstractWorkAssigner.getQueueKey(filename1, target), file1);
+    expectLastCall().once();
+
+    // file2 is *not* queued because file1 must be replicated first
+
+    replay(workQueue);
+
+    assigner.createWork();
+
+    verify(workQueue);
+
+    Assert.assertEquals(1, queuedWork.size());
+    Assert.assertTrue(queuedWork.containsKey("cluster1"));
+    Map<String,String> cluster1Work = queuedWork.get("cluster1");
+    Assert.assertEquals(1, cluster1Work.size());
+    Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId()));
+    Assert.assertEquals(AbstractWorkAssigner.getQueueKey(filename1, target), cluster1Work.get(target.getSourceTableId()));
+  }
+
+  @Test
+  public void workAcrossTablesHappensConcurrently() throws Exception {
+    ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1");
+    Text serializedTarget1 = target1.toText();
+
+    ReplicationTarget target2 = new ReplicationTarget("cluster1", "table2", "2");
+    Text serializedTarget2 = target2.toText();
+
+    MockInstance inst = new MockInstance(test.getMethodName());
+    Credentials creds = new Credentials("root", new PasswordToken(""));
+    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+
+    // Set the connector
+    assigner.setConnector(conn);
+
+    // Create and grant ourselves write to the replication table
+    ReplicationTable.create(conn);
+    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+
+    // Create two mutations, both of which need replication work done
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    // We want the name of file2 to sort before file1
+    String filename1 = "z_file1", filename2 = "a_file1";
+    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/"
+ filename2;
+
+    // File1 was closed before file2, however
+    Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setClosedTime(250).build();
+    Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setClosedTime(500).build();
+
+    Mutation m = new Mutation(file1);
+    WorkSection.add(m, serializedTarget1, ProtobufUtil.toValue(stat1));
+    bw.addMutation(m);
+
+    m = new Mutation(file2);
+    WorkSection.add(m, serializedTarget2, ProtobufUtil.toValue(stat2));
+    bw.addMutation(m);
+
+    m = OrderSection.createMutation(file1, stat1.getClosedTime());
+    OrderSection.add(m, new Text(target1.getSourceTableId()), ProtobufUtil.toValue(stat1));
+    bw.addMutation(m);
+
+    m = OrderSection.createMutation(file2, stat2.getClosedTime());
+    OrderSection.add(m, new Text(target2.getSourceTableId()), ProtobufUtil.toValue(stat2));
+    bw.addMutation(m);
+
+    bw.close();
+
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+    Map<String,Map<String,String>> queuedWork = new HashMap<>();
+    assigner.setQueuedWork(queuedWork);
+    assigner.setWorkQueue(workQueue);
+    assigner.setMaxQueueSize(Integer.MAX_VALUE);
 
     // Make sure we expect the invocations in the correct order (accumulo is sorted)
-    expect(queuedWork.contains(filename1 + "|" + keyTarget)).andReturn(false);
-    workQueue.addWork(filename1 + "|" + keyTarget, file1);
+    workQueue.addWork(AbstractWorkAssigner.getQueueKey(filename1, target1), file1);
+    expectLastCall().once();
+
+    workQueue.addWork(AbstractWorkAssigner.getQueueKey(filename2, target2), file2);
     expectLastCall().once();
 
     // file2 is *not* queued because file1 must be replicated first
 
-    replay(queuedWork, workQueue);
+    replay(workQueue);
 
     assigner.createWork();
 
-    verify(queuedWork, workQueue);
+    verify(workQueue);
+
+    Assert.assertEquals(1, queuedWork.size());
+    Assert.assertTrue(queuedWork.containsKey("cluster1"));
+
+    Map<String,String> cluster1Work = queuedWork.get("cluster1");
+    Assert.assertEquals(2, cluster1Work.size());
+    Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId()));
+    Assert.assertEquals(AbstractWorkAssigner.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId()));
+
+    Assert.assertTrue(cluster1Work.containsKey(target2.getSourceTableId()));
+    Assert.assertEquals(AbstractWorkAssigner.getQueueKey(filename2, target2), cluster1Work.get(target2.getSourceTableId()));
+  }
+
+  @Test
+  public void workAcrossPeersHappensConcurrently() throws Exception {
+    ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1");
+    Text serializedTarget1 = target1.toText();
+
+    ReplicationTarget target2 = new ReplicationTarget("cluster2", "table1", "1");
+    Text serializedTarget2 = target2.toText();
+
+    MockInstance inst = new MockInstance(test.getMethodName());
+    Credentials creds = new Credentials("root", new PasswordToken(""));
+    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+
+    // Set the connector
+    assigner.setConnector(conn);
+
+    // Create and grant ourselves write to the replication table
+    ReplicationTable.create(conn);
+    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+
+    // Create two mutations, both of which need replication work done
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    // We want the name of file2 to sort before file1
+    String filename1 = "z_file1", filename2 = "a_file1";
+    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/"
+ filename2;
+
+    // File1 was closed before file2, however
+    Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setClosedTime(250).build();
+    Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setClosedTime(500).build();
+
+    Mutation m = new Mutation(file1);
+    WorkSection.add(m, serializedTarget1, ProtobufUtil.toValue(stat1));
+    bw.addMutation(m);
+
+    m = new Mutation(file2);
+    WorkSection.add(m, serializedTarget2, ProtobufUtil.toValue(stat2));
+    bw.addMutation(m);
+
+    m = OrderSection.createMutation(file1, stat1.getClosedTime());
+    OrderSection.add(m, new Text(target1.getSourceTableId()), ProtobufUtil.toValue(stat1));
+    bw.addMutation(m);
+
+    m = OrderSection.createMutation(file2, stat2.getClosedTime());
+    OrderSection.add(m, new Text(target2.getSourceTableId()), ProtobufUtil.toValue(stat2));
+    bw.addMutation(m);
+
+    bw.close();
+
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+    Map<String,Map<String,String>> queuedWork = new HashMap<>();
+    assigner.setQueuedWork(queuedWork);
+    assigner.setWorkQueue(workQueue);
+    assigner.setMaxQueueSize(Integer.MAX_VALUE);
+
+    // Make sure we expect the invocations in the correct order (accumulo is sorted)
+    workQueue.addWork(AbstractWorkAssigner.getQueueKey(filename1, target1), file1);
+    expectLastCall().once();
+
+    workQueue.addWork(AbstractWorkAssigner.getQueueKey(filename2, target2), file2);
+    expectLastCall().once();
+
+    // file2 is *not* queued because file1 must be replicated first
+
+    replay(workQueue);
+
+    assigner.createWork();
+
+    verify(workQueue);
+
+    Assert.assertEquals(2, queuedWork.size());
+    Assert.assertTrue(queuedWork.containsKey("cluster1"));
+
+    Map<String,String> cluster1Work = queuedWork.get("cluster1");
+    Assert.assertEquals(1, cluster1Work.size());
+    Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId()));
+    Assert.assertEquals(AbstractWorkAssigner.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId()));
+
+    Map<String,String> cluster2Work = queuedWork.get("cluster2");
+    Assert.assertEquals(1, cluster2Work.size());
+    Assert.assertTrue(cluster2Work.containsKey(target2.getSourceTableId()));
+    Assert.assertEquals(AbstractWorkAssigner.getQueueKey(filename2, target2), cluster2Work.get(target2.getSourceTableId()));
+  }
+
+  @Test
+  public void basicZooKeeperCleanup() throws Exception {
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+    ZooCache zooCache = createMock(ZooCache.class);
+    Instance inst = createMock(Instance.class);
+
+    Map<String,Map<String,String>> queuedWork = new TreeMap<>();
+    Map<String,String> cluster1Work = new TreeMap<>();
+
+    // Two files for cluster1, one for table '1' and another for table '2' we havce assigned
work for
+    cluster1Work.put("1", AbstractWorkAssigner.getQueueKey("file1", new ReplicationTarget("cluster1",
"1", "1")));
+    cluster1Work.put("2", AbstractWorkAssigner.getQueueKey("file2", new ReplicationTarget("cluster1",
"2", "2")));
+
+    queuedWork.put("cluster1", cluster1Work);
+
+    assigner.setConnector(conn);
+    assigner.setZooCache(zooCache);
+    assigner.setWorkQueue(workQueue);
+    assigner.setQueuedWork(queuedWork);
+
+    expect(conn.getInstance()).andReturn(inst);
+    expect(inst.getInstanceID()).andReturn("instance");
+
+    // file1 replicated
+    expect(
+        zooCache.get(ZooUtil.getRoot("instance") + Constants.ZREPLICATION_WORK_QUEUE + "/"
+            + AbstractWorkAssigner.getQueueKey("file1", new ReplicationTarget("cluster1",
"1", "1")))).andReturn(null);
+    // file2 still needs to replicate
+    expect(
+        zooCache.get(ZooUtil.getRoot("instance") + Constants.ZREPLICATION_WORK_QUEUE + "/"
+            + AbstractWorkAssigner.getQueueKey("file2", new ReplicationTarget("cluster1",
"2", "2")))).andReturn(new byte[0]);
+
+    replay(workQueue, zooCache, conn, inst);
+
+    assigner.cleanupFinishedWork();
+
+    verify(workQueue, zooCache, conn, inst);
+
+    Assert.assertEquals(1, cluster1Work.size());
+    Assert.assertEquals(AbstractWorkAssigner.getQueueKey("file2", new ReplicationTarget("cluster1",
"2", "2")), cluster1Work.get("2"));
   }
 }


Mime
View raw message