Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7A822101ED for ; Wed, 28 May 2014 05:08:06 +0000 (UTC) Received: (qmail 19537 invoked by uid 500); 28 May 2014 05:08:06 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 19431 invoked by uid 500); 28 May 2014 05:08:06 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 19418 invoked by uid 99); 28 May 2014 05:08:06 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 May 2014 05:08:06 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 1D1EB94BB7A; Wed, 28 May 2014 05:08:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Wed, 28 May 2014 05:08:07 -0000 Message-Id: <7610385a3a04405bbc8704672efffbe5@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] git commit: ACCUMULO-378 (re)Create the UnorderedWorkAssigner which doesn't care about replaying files in any order ACCUMULO-378 (re)Create the UnorderedWorkAssigner which doesn't care about replaying files in any order Increase the default size of the threadpool too. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/070ceb1d Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/070ceb1d Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/070ceb1d Branch: refs/heads/ACCUMULO-378 Commit: 070ceb1dacd53ed169ee48af7b72b0788220a941 Parents: 34da6fe Author: Josh Elser Authored: Wed May 28 00:23:34 2014 -0400 Committer: Josh Elser Committed: Wed May 28 00:23:34 2014 -0400 ---------------------------------------------------------------------- .../org/apache/accumulo/core/conf/Property.java | 2 +- .../DistributedWorkQueueWorkAssignerHelper.java | 77 +++ .../DistributedWorkQueueWorkAssigner.java | 50 +- .../replication/SequentialWorkAssigner.java | 5 +- .../replication/UnorderedWorkAssigner.java | 10 +- .../replication/AbstractWorkAssignerTest.java | 55 -- ...tributedWorkQueueWorkAssignerHelperTest.java | 56 ++ .../replication/SequentialWorkAssignerTest.java | 37 +- .../replication/UnorderedWorkAssignerTest.java | 15 +- .../monitor/servlets/ReplicationServlet.java | 4 +- .../replication/ReplicationProcessor.java | 4 +- .../replication/ReplicationProcessorTest.java | 4 +- .../UnorderedWorkAssignerReplicationIT.java | 643 +++++++++++++++++++ 13 files changed, 821 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/070ceb1d/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 8b24332..6afa956 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -475,7 +475,7 @@ public enum Property { @Experimental REPLICATION_WORK_ASSIGNMENT_SLEEP("replication.work.assignment.sleep", "30s", PropertyType.TIMEDURATION, "Amount of time to sleep between replication work assignment"), @Experimental - REPLICATION_WORKER_THREADS("replication.worker.threads", "1", PropertyType.COUNT, "Size of the threadpool that each tabletserver devotes to replicating data"), + REPLICATION_WORKER_THREADS("replication.worker.threads", "4", PropertyType.COUNT, "Size of the threadpool that each tabletserver devotes to replicating data"), @Experimental REPLICATION_RECEIPT_SERVICE_PORT("replication.receipt.service.port", "10002", PropertyType.PORT, "Listen port used by thrift service in tserver listening for replication"), @Experimental http://git-wip-us.apache.org/repos/asf/accumulo/blob/070ceb1d/server/base/src/main/java/org/apache/accumulo/server/replication/DistributedWorkQueueWorkAssignerHelper.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/DistributedWorkQueueWorkAssignerHelper.java b/server/base/src/main/java/org/apache/accumulo/server/replication/DistributedWorkQueueWorkAssignerHelper.java new file mode 100644 index 0000000..baa8383 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/replication/DistributedWorkQueueWorkAssignerHelper.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.replication; + +import java.util.Map.Entry; + +import org.apache.accumulo.core.replication.ReplicationTarget; +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +/** + * + */ +public class DistributedWorkQueueWorkAssignerHelper { + + + public static final String KEY_SEPARATOR = "|"; + + /** + * Serialize a filename and a {@link ReplicationTarget} into the expected key format for use with the {@link DistributedWorkQueue} + * + * @param filename + * Filename for data to be replicated + * @param replTarget + * Information about replication peer + * @return Key for identifying work in queue + */ + public static String getQueueKey(String filename, ReplicationTarget replTarget) { + return filename + KEY_SEPARATOR + replTarget.getPeerName() + KEY_SEPARATOR + replTarget.getRemoteIdentifier() + KEY_SEPARATOR + + replTarget.getSourceTableId(); + } + + /** + * @param queueKey + * Key from the work queue + * @return Components which created the queue key + */ + public static Entry fromQueueKey(String queueKey) { + Preconditions.checkNotNull(queueKey); + + int index = queueKey.indexOf(KEY_SEPARATOR); + if (-1 == index) { + throw new IllegalArgumentException("Could not find expected separator in queue key '" + queueKey + "'"); + } + + String filename = queueKey.substring(0, index); + + int secondIndex = queueKey.indexOf(KEY_SEPARATOR, index + 1); + if (-1 == secondIndex) { + throw new IllegalArgumentException("Could not find expected separator in queue key '" + queueKey + "'"); + } + + int thirdIndex = queueKey.indexOf(KEY_SEPARATOR, secondIndex + 1); + if (-1 == thirdIndex) { + throw new IllegalArgumentException("Could not find expected seperator in queue key '" + queueKey + "'"); + } + + return Maps.immutableEntry(filename, new ReplicationTarget(queueKey.substring(index + 1, secondIndex), queueKey.substring(secondIndex + 1, thirdIndex), + queueKey.substring(thirdIndex + 1))); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/070ceb1d/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java index 0fd5205..4815305 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java @@ -37,6 +37,7 @@ import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper; import org.apache.accumulo.server.replication.ReplicationTable; import org.apache.accumulo.server.replication.WorkAssigner; import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; @@ -46,8 +47,6 @@ import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; import com.google.protobuf.InvalidProtocolBufferException; /** @@ -66,51 +65,6 @@ public abstract class DistributedWorkQueueWorkAssigner implements WorkAssigner { protected int maxQueueSize; protected ZooCache zooCache; - public static final String KEY_SEPARATOR = "|"; - - /** - * Serialize a filename and a {@link ReplicationTarget} into the expected key format for use with the {@link DistributedWorkQueue} - * - * @param filename - * Filename for data to be replicated - * @param replTarget - * Information about replication peer - * @return Key for identifying work in queue - */ - public static String getQueueKey(String filename, ReplicationTarget replTarget) { - return filename + KEY_SEPARATOR + replTarget.getPeerName() + KEY_SEPARATOR + replTarget.getRemoteIdentifier() + KEY_SEPARATOR - + replTarget.getSourceTableId(); - } - - /** - * @param queueKey - * Key from the work queue - * @return Components which created the queue key - */ - public static Entry fromQueueKey(String queueKey) { - Preconditions.checkNotNull(queueKey); - - int index = queueKey.indexOf(KEY_SEPARATOR); - if (-1 == index) { - throw new IllegalArgumentException("Could not find expected separator in queue key '" + queueKey + "'"); - } - - String filename = queueKey.substring(0, index); - - int secondIndex = queueKey.indexOf(KEY_SEPARATOR, index + 1); - if (-1 == secondIndex) { - throw new IllegalArgumentException("Could not find expected separator in queue key '" + queueKey + "'"); - } - - int thirdIndex = queueKey.indexOf(KEY_SEPARATOR, secondIndex + 1); - if (-1 == thirdIndex) { - throw new IllegalArgumentException("Could not find expected seperator in queue key '" + queueKey + "'"); - } - - return Maps.immutableEntry(filename, new ReplicationTarget(queueKey.substring(index + 1, secondIndex), queueKey.substring(secondIndex + 1, thirdIndex), - queueKey.substring(thirdIndex + 1))); - } - /* * Getters/setters for testing purposes */ @@ -252,7 +206,7 @@ public abstract class DistributedWorkQueueWorkAssigner implements WorkAssigner { Path p = new Path(file); String filename = p.getName(); - String key = getQueueKey(filename, target); + String key = DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename, target); if (!shouldQueueWork(target)) { if (!isWorkRequired(status) && keysBeingReplicated.contains(key)) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/070ceb1d/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java index e9ed34e..e56763e 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java @@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.replication.ReplicationTarget; import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper; import org.apache.hadoop.fs.Path; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -95,7 +96,7 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner { log.info("Restoring replication work queue state from zookeeper"); for (String work : existingWork) { - Entry entry = fromQueueKey(work); + Entry entry = DistributedWorkQueueWorkAssignerHelper.fromQueueKey(work); String filename = entry.getKey(); String peerName = entry.getValue().getPeerName(); String sourceTableId = entry.getValue().getSourceTableId(); @@ -167,7 +168,7 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner { @Override protected boolean queueWork(Path path, ReplicationTarget target) { - String queueKey = getQueueKey(path.getName(), target); + String queueKey = DistributedWorkQueueWorkAssignerHelper.getQueueKey(path.getName(), target); Map workForPeer = this.queuedWorkByPeerName.get(target.getPeerName()); if (null == workForPeer) { workForPeer = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/070ceb1d/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java index 931b2a5..b6706ef 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java @@ -26,6 +26,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.replication.ReplicationTarget; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper; import org.apache.hadoop.fs.Path; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -42,8 +43,8 @@ import org.slf4j.LoggerFactory; * throughput. */ public class UnorderedWorkAssigner extends DistributedWorkQueueWorkAssigner { - private static final Logger log = LoggerFactory.getLogger(DistributedWorkQueueWorkAssigner.class); - private static final String NAME = "DistributedWorkQueue Replication Work Assigner"; + private static final Logger log = LoggerFactory.getLogger(UnorderedWorkAssigner.class); + private static final String NAME = "Unordered Work Assigner"; private Set queuedWork; @@ -105,7 +106,7 @@ public class UnorderedWorkAssigner extends DistributedWorkQueueWorkAssigner { */ @Override protected boolean queueWork(Path path, ReplicationTarget target) { - String queueKey = getQueueKey(path.getName(), target); + String queueKey = DistributedWorkQueueWorkAssignerHelper.getQueueKey(path.getName(), target); if (queuedWork.contains(queueKey)) { return false; } @@ -151,7 +152,8 @@ public class UnorderedWorkAssigner extends DistributedWorkQueueWorkAssigner { @Override protected Set getQueuedWork(ReplicationTarget target) { - String desiredQueueKeySuffix = KEY_SEPARATOR + target.getPeerName() + KEY_SEPARATOR + target.getRemoteIdentifier() + KEY_SEPARATOR + String desiredQueueKeySuffix = DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target.getPeerName() + + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target.getRemoteIdentifier() + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target.getSourceTableId(); Set queuedWorkForTarget = new HashSet<>(); for (String queuedWork : this.queuedWork) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/070ceb1d/server/master/src/test/java/org/apache/accumulo/master/replication/AbstractWorkAssignerTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/AbstractWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/AbstractWorkAssignerTest.java deleted file mode 100644 index 655c81a..0000000 --- a/server/master/src/test/java/org/apache/accumulo/master/replication/AbstractWorkAssignerTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.master.replication; - -import java.util.Map.Entry; -import java.util.UUID; - -import org.apache.accumulo.core.replication.ReplicationTarget; -import org.apache.hadoop.fs.Path; -import org.apache.zookeeper.common.PathUtils; -import org.junit.Assert; -import org.junit.Test; - -/** - * - */ -public class AbstractWorkAssignerTest { - - @Test - public void createsValidZKNodeName() { - Path p = new Path("/accumulo/wals/tserver+port/" + UUID.randomUUID().toString()); - ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1"); - - String key = DistributedWorkQueueWorkAssigner.getQueueKey(p.toString(), target); - - PathUtils.validatePath(key); - } - - @Test - public void queueKeySerialization() { - Path p = new Path("/accumulo/wals/tserver+port/" + UUID.randomUUID().toString()); - ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1"); - - String key = DistributedWorkQueueWorkAssigner.getQueueKey(p.toString(), target); - - Entry result = DistributedWorkQueueWorkAssigner.fromQueueKey(key); - Assert.assertEquals(p.toString(), result.getKey()); - Assert.assertEquals(target, result.getValue()); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/070ceb1d/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerHelperTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerHelperTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerHelperTest.java new file mode 100644 index 0000000..feee6d5 --- /dev/null +++ b/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerHelperTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.replication; + +import java.util.Map.Entry; +import java.util.UUID; + +import org.apache.accumulo.core.replication.ReplicationTarget; +import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper; +import org.apache.hadoop.fs.Path; +import org.apache.zookeeper.common.PathUtils; +import org.junit.Assert; +import org.junit.Test; + +/** + * + */ +public class DistributedWorkQueueWorkAssignerHelperTest { + + @Test + public void createsValidZKNodeName() { + Path p = new Path("/accumulo/wals/tserver+port/" + UUID.randomUUID().toString()); + ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1"); + + String key = DistributedWorkQueueWorkAssignerHelper.getQueueKey(p.toString(), target); + + PathUtils.validatePath(key); + } + + @Test + public void queueKeySerialization() { + Path p = new Path("/accumulo/wals/tserver+port/" + UUID.randomUUID().toString()); + ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1"); + + String key = DistributedWorkQueueWorkAssignerHelper.getQueueKey(p.toString(), target); + + Entry result = DistributedWorkQueueWorkAssignerHelper.fromQueueKey(key); + Assert.assertEquals(p.toString(), result.getKey()); + Assert.assertEquals(target, result.getValue()); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/070ceb1d/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java index 4f56862..e7ff4ca 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java @@ -42,6 +42,7 @@ import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper; import org.apache.accumulo.server.replication.ReplicationTable; import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; import org.apache.accumulo.server.zookeeper.ZooCache; @@ -122,7 +123,7 @@ public class SequentialWorkAssignerTest { assigner.setMaxQueueSize(Integer.MAX_VALUE); // Make sure we expect the invocations in the correct order (accumulo is sorted) - workQueue.addWork(DistributedWorkQueueWorkAssigner.getQueueKey(filename1, target), file1); + workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target), file1); expectLastCall().once(); // file2 is *not* queued because file1 must be replicated first @@ -138,7 +139,7 @@ public class SequentialWorkAssignerTest { Map cluster1Work = queuedWork.get("cluster1"); Assert.assertEquals(1, cluster1Work.size()); Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId())); - Assert.assertEquals(DistributedWorkQueueWorkAssigner.getQueueKey(filename1, target), cluster1Work.get(target.getSourceTableId())); + Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target), cluster1Work.get(target.getSourceTableId())); } @Test @@ -195,10 +196,10 @@ public class SequentialWorkAssignerTest { assigner.setMaxQueueSize(Integer.MAX_VALUE); // Make sure we expect the invocations in the correct order (accumulo is sorted) - workQueue.addWork(DistributedWorkQueueWorkAssigner.getQueueKey(filename1, target1), file1); + workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), file1); expectLastCall().once(); - workQueue.addWork(DistributedWorkQueueWorkAssigner.getQueueKey(filename2, target2), file2); + workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), file2); expectLastCall().once(); // file2 is *not* queued because file1 must be replicated first @@ -215,10 +216,10 @@ public class SequentialWorkAssignerTest { Map cluster1Work = queuedWork.get("cluster1"); Assert.assertEquals(2, cluster1Work.size()); Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId())); - Assert.assertEquals(DistributedWorkQueueWorkAssigner.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId())); + Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId())); Assert.assertTrue(cluster1Work.containsKey(target2.getSourceTableId())); - Assert.assertEquals(DistributedWorkQueueWorkAssigner.getQueueKey(filename2, target2), cluster1Work.get(target2.getSourceTableId())); + Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster1Work.get(target2.getSourceTableId())); } @Test @@ -275,10 +276,10 @@ public class SequentialWorkAssignerTest { assigner.setMaxQueueSize(Integer.MAX_VALUE); // Make sure we expect the invocations in the correct order (accumulo is sorted) - workQueue.addWork(DistributedWorkQueueWorkAssigner.getQueueKey(filename1, target1), file1); + workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), file1); expectLastCall().once(); - workQueue.addWork(DistributedWorkQueueWorkAssigner.getQueueKey(filename2, target2), file2); + workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), file2); expectLastCall().once(); // file2 is *not* queued because file1 must be replicated first @@ -295,12 +296,12 @@ public class SequentialWorkAssignerTest { Map cluster1Work = queuedWork.get("cluster1"); Assert.assertEquals(1, cluster1Work.size()); Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId())); - Assert.assertEquals(DistributedWorkQueueWorkAssigner.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId())); + Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId())); Map cluster2Work = queuedWork.get("cluster2"); Assert.assertEquals(1, cluster2Work.size()); Assert.assertTrue(cluster2Work.containsKey(target2.getSourceTableId())); - Assert.assertEquals(DistributedWorkQueueWorkAssigner.getQueueKey(filename2, target2), cluster2Work.get(target2.getSourceTableId())); + Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster2Work.get(target2.getSourceTableId())); } @Test @@ -313,8 +314,8 @@ public class SequentialWorkAssignerTest { Map cluster1Work = new TreeMap<>(); // Two files for cluster1, one for table '1' and another for table '2' we havce assigned work for - cluster1Work.put("1", DistributedWorkQueueWorkAssigner.getQueueKey("file1", new ReplicationTarget("cluster1", "1", "1"))); - cluster1Work.put("2", DistributedWorkQueueWorkAssigner.getQueueKey("file2", new ReplicationTarget("cluster1", "2", "2"))); + cluster1Work.put("1", DistributedWorkQueueWorkAssignerHelper.getQueueKey("file1", new ReplicationTarget("cluster1", "1", "1"))); + cluster1Work.put("2", DistributedWorkQueueWorkAssignerHelper.getQueueKey("file2", new ReplicationTarget("cluster1", "2", "2"))); queuedWork.put("cluster1", cluster1Work); @@ -329,11 +330,11 @@ public class SequentialWorkAssignerTest { // file1 replicated expect( zooCache.get(ZooUtil.getRoot("instance") + Constants.ZREPLICATION_WORK_QUEUE + "/" - + DistributedWorkQueueWorkAssigner.getQueueKey("file1", new ReplicationTarget("cluster1", "1", "1")))).andReturn(null); + + DistributedWorkQueueWorkAssignerHelper.getQueueKey("file1", new ReplicationTarget("cluster1", "1", "1")))).andReturn(null); // file2 still needs to replicate expect( zooCache.get(ZooUtil.getRoot("instance") + Constants.ZREPLICATION_WORK_QUEUE + "/" - + DistributedWorkQueueWorkAssigner.getQueueKey("file2", new ReplicationTarget("cluster1", "2", "2")))).andReturn(new byte[0]); + + DistributedWorkQueueWorkAssignerHelper.getQueueKey("file2", new ReplicationTarget("cluster1", "2", "2")))).andReturn(new byte[0]); replay(workQueue, zooCache, conn, inst); @@ -342,7 +343,7 @@ public class SequentialWorkAssignerTest { verify(workQueue, zooCache, conn, inst); Assert.assertEquals(1, cluster1Work.size()); - Assert.assertEquals(DistributedWorkQueueWorkAssigner.getQueueKey("file2", new ReplicationTarget("cluster1", "2", "2")), cluster1Work.get("2")); + Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey("file2", new ReplicationTarget("cluster1", "2", "2")), cluster1Work.get("2")); } @Test @@ -394,7 +395,7 @@ public class SequentialWorkAssignerTest { // Treat filename1 as we have already submitted it for replication Map> queuedWork = new HashMap<>(); Map queuedWorkForCluster = new HashMap<>(); - queuedWorkForCluster.put(target.getSourceTableId(), DistributedWorkQueueWorkAssigner.getQueueKey(filename1, target)); + queuedWorkForCluster.put(target.getSourceTableId(), DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target)); queuedWork.put("cluster1", queuedWorkForCluster); assigner.setQueuedWork(queuedWork); @@ -402,7 +403,7 @@ public class SequentialWorkAssignerTest { assigner.setMaxQueueSize(Integer.MAX_VALUE); // Make sure we expect the invocations in the correct order (accumulo is sorted) - workQueue.addWork(DistributedWorkQueueWorkAssigner.getQueueKey(filename2, target), file2); + workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target), file2); expectLastCall().once(); // file2 is queued because we remove file1 because it's fully replicated @@ -418,6 +419,6 @@ public class SequentialWorkAssignerTest { Map cluster1Work = queuedWork.get("cluster1"); Assert.assertEquals(1, cluster1Work.size()); Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId())); - Assert.assertEquals(DistributedWorkQueueWorkAssigner.getQueueKey(filename2, target), cluster1Work.get(target.getSourceTableId())); + Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target), cluster1Work.get(target.getSourceTableId())); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/070ceb1d/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java index 68a9f5c..0c9384e 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java @@ -45,6 +45,7 @@ import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper; import org.apache.accumulo.server.replication.ReplicationTable; import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; import org.apache.accumulo.server.zookeeper.ZooCache; @@ -83,8 +84,8 @@ public class UnorderedWorkAssignerTest { Path p = new Path("/accumulo/wal/tserver+port/" + UUID.randomUUID()); - String expectedQueueKey = p.getName() + DistributedWorkQueueWorkAssigner.KEY_SEPARATOR + target.getPeerName() + DistributedWorkQueueWorkAssigner.KEY_SEPARATOR - + target.getRemoteIdentifier() + DistributedWorkQueueWorkAssigner.KEY_SEPARATOR + target.getSourceTableId(); + String expectedQueueKey = p.getName() + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target.getPeerName() + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + + target.getRemoteIdentifier() + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target.getSourceTableId(); workQueue.addWork(expectedQueueKey, p.toString()); expectLastCall().once(); @@ -120,9 +121,9 @@ public class UnorderedWorkAssignerTest { public void createWorkForFilesNeedingIt() throws Exception { ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1"), target2 = new ReplicationTarget("cluster1", "table2", "2"); Text serializedTarget1 = target1.toText(), serializedTarget2 = target2.toText(); - String keyTarget1 = target1.getPeerName() + DistributedWorkQueueWorkAssigner.KEY_SEPARATOR + target1.getRemoteIdentifier() - + DistributedWorkQueueWorkAssigner.KEY_SEPARATOR + target1.getSourceTableId(), keyTarget2 = target2.getPeerName() - + DistributedWorkQueueWorkAssigner.KEY_SEPARATOR + target2.getRemoteIdentifier() + DistributedWorkQueueWorkAssigner.KEY_SEPARATOR + String keyTarget1 = target1.getPeerName() + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target1.getRemoteIdentifier() + + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target1.getSourceTableId(), keyTarget2 = target2.getPeerName() + + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target2.getRemoteIdentifier() + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target2.getSourceTableId(); MockInstance inst = new MockInstance(test.getMethodName()); @@ -255,8 +256,8 @@ public class UnorderedWorkAssignerTest { assigner.setQueuedWork(queuedWork); ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1"); - String serializedTarget = target.getPeerName() + DistributedWorkQueueWorkAssigner.KEY_SEPARATOR + target.getRemoteIdentifier() - + DistributedWorkQueueWorkAssigner.KEY_SEPARATOR + target.getSourceTableId(); + String serializedTarget = target.getPeerName() + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target.getRemoteIdentifier() + + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target.getSourceTableId(); queuedWork.add("wal1|" + serializedTarget.toString()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/070ceb1d/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java index 810757e..ab83b4a 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java @@ -49,11 +49,11 @@ import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.zookeeper.ZooUtil; -import org.apache.accumulo.master.replication.DistributedWorkQueueWorkAssigner; import org.apache.accumulo.monitor.util.Table; import org.apache.accumulo.monitor.util.celltypes.NumberType; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper; import org.apache.accumulo.server.security.SystemCredentials; import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; import org.apache.accumulo.server.zookeeper.ZooCache; @@ -212,7 +212,7 @@ public class ReplicationServlet extends BasicServlet { DistributedWorkQueue workQueue = new DistributedWorkQueue(workQueuePath, ServerConfiguration.getSystemConfiguration(inst)); for (String queueKey : workQueue.getWorkQueued()) { - Entry queueKeyPair = DistributedWorkQueueWorkAssigner.fromQueueKey(queueKey); + Entry queueKeyPair = DistributedWorkQueueWorkAssignerHelper.fromQueueKey(queueKey); String filename = queueKeyPair.getKey(); ReplicationTarget target = queueKeyPair.getValue(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/070ceb1d/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java index 199de91..f1bba6b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java @@ -37,9 +37,9 @@ import org.apache.accumulo.core.replication.ReplicationTarget; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Credentials; -import org.apache.accumulo.master.replication.DistributedWorkQueueWorkAssigner; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper; import org.apache.accumulo.server.replication.ReplicationTable; import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor; import org.apache.hadoop.fs.Path; @@ -76,7 +76,7 @@ public class ReplicationProcessor implements Processor { @Override public void process(String workID, byte[] data) { - ReplicationTarget target = DistributedWorkQueueWorkAssigner.fromQueueKey(workID).getValue(); + ReplicationTarget target = DistributedWorkQueueWorkAssignerHelper.fromQueueKey(workID).getValue(); String file = new String(data); log.debug("Received replication work for {} to {}", file, target); http://git-wip-us.apache.org/repos/asf/accumulo/blob/070ceb1d/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java index 6ed589a..e37d78a 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java @@ -29,8 +29,8 @@ import org.apache.accumulo.core.replication.ReplicaSystemHelper; import org.apache.accumulo.core.replication.ReplicationTarget; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Credentials; -import org.apache.accumulo.master.replication.DistributedWorkQueueWorkAssigner; import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper; import org.apache.hadoop.fs.Path; import org.easymock.EasyMock; import org.junit.Assert; @@ -83,7 +83,7 @@ public class ReplicationProcessorTest { Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).build(); Path path = new Path("/accumulo"); - String queueKey = DistributedWorkQueueWorkAssigner.getQueueKey(path.toString(), target); + String queueKey = DistributedWorkQueueWorkAssignerHelper.getQueueKey(path.toString(), target); EasyMock.expect(proc.getReplicaSystem(target)).andReturn(replica); EasyMock.expect(proc.getStatus(path.toString(), target)).andReturn(status); http://git-wip-us.apache.org/repos/asf/accumulo/blob/070ceb1d/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java new file mode 100644 index 0000000..17c8179 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java @@ -0,0 +1,643 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.replication.ReplicaSystemFactory; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; +import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.StatusUtil; +import org.apache.accumulo.core.replication.proto.Replication.Status; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.master.replication.UnorderedWorkAssigner; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.accumulo.server.replication.ReplicationTable; +import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.accumulo.tserver.TabletServer; +import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT { + private static final Logger log = LoggerFactory.getLogger(UnorderedWorkAssignerReplicationIT.class); + + private ExecutorService executor; + + @Before + public void createExecutor() { + executor = Executors.newSingleThreadExecutor(); + } + + @After + public void stopExecutor() { + if (null != executor) { + executor.shutdownNow(); + } + } + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(1); + cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M"); + cfg.setProperty(Property.GC_CYCLE_START, "1s"); + cfg.setProperty(Property.GC_CYCLE_DELAY, "5s"); + cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); + cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); + cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M"); + cfg.setProperty(Property.REPLICATION_NAME, "master"); + cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, UnorderedWorkAssigner.class.getName()); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + @Test(timeout = 60 * 5000) + public void dataWasReplicatedToThePeer() throws Exception { + MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), + ROOT_PASSWORD); + peerCfg.setNumTservers(1); + peerCfg.setInstanceName("peer"); + peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M"); + peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); + peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); + peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); + peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, UnorderedWorkAssigner.class.getName()); + MiniAccumuloClusterImpl peerCluster = peerCfg.build(); + + peerCluster.start(); + + try { + final Connector connMaster = getConnector(); + final Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD); + + ReplicationTable.create(connMaster); + + String peerUserName = "peer", peerPassword = "foo"; + + String peerClusterName = "peer"; + + connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword)); + + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName); + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword); + + // ...peer = AccumuloReplicaSystem,instanceName,zookeepers + connMaster.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + peerClusterName, + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers()))); + + final String masterTable = "master", peerTable = "peer"; + + connMaster.tableOperations().create(masterTable); + String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable); + Assert.assertNotNull(masterTableId); + + connPeer.tableOperations().create(peerTable); + String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable); + Assert.assertNotNull(peerTableId); + + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE); + + // Replicate this table to the peerClusterName in a table with the peerTableId table id + connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId); + + // Write some data to table1 + BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig()); + for (int rows = 0; rows < 5000; rows++) { + Mutation m = new Mutation(Integer.toString(rows)); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + } + bw.addMutation(m); + } + + bw.close(); + + log.info("Wrote all data to master cluster"); + + final Set filesNeedingReplication = connMaster.replicationOperations().referencedFiles(masterTable); + + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + cluster.exec(TabletServer.class); + + log.info("TabletServer restarted"); + for (@SuppressWarnings("unused") + Entry e : ReplicationTable.getScanner(connMaster)) {} + log.info("TabletServer is online"); + + log.info(""); + log.info("Fetching metadata records:"); + for (Entry kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { + if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) { + log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); + } else { + log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue()); + } + } + + log.info(""); + log.info("Fetching replication records:"); + for (Entry kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) { + log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); + } + + Future future = executor.submit(new Callable() { + + @Override + public Boolean call() throws Exception { + connMaster.replicationOperations().drain(masterTable, filesNeedingReplication); + log.info("Drain completed"); + return true; + } + + }); + + try { + future.get(30, TimeUnit.SECONDS); + } catch (TimeoutException e) { + future.cancel(true); + Assert.fail("Drain did not finish within 30 seconds"); + } + + log.info("drain completed"); + + log.info(""); + log.info("Fetching metadata records:"); + for (Entry kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { + if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) { + log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); + } else { + log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue()); + } + } + + log.info(""); + log.info("Fetching replication records:"); + for (Entry kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) { + log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); + } + + Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY); + Iterator> masterIter = master.iterator(), peerIter = peer.iterator(); + Entry masterEntry = null, peerEntry = null; + while (masterIter.hasNext() && peerIter.hasNext()) { + masterEntry = masterIter.next(); + peerEntry = peerIter.next(); + Assert.assertEquals(masterEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0, + masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)); + Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue()); + } + + log.info("Last master entry: " + masterEntry); + log.info("Last peer entry: " + peerEntry); + + Assert.assertFalse("Had more data to read from the master", masterIter.hasNext()); + Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext()); + } finally { + peerCluster.stop(); + } + } + + @Test(timeout = 60 * 5000) + public void dataReplicatedToCorrectTable() throws Exception { + MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), + ROOT_PASSWORD); + peerCfg.setNumTservers(1); + peerCfg.setInstanceName("peer"); + peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M"); + peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); + peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); + peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); + peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, UnorderedWorkAssigner.class.getName()); + MiniAccumuloClusterImpl peer1Cluster = peerCfg.build(); + + peer1Cluster.start(); + + try { + Connector connMaster = getConnector(); + Connector connPeer = peer1Cluster.getConnector("root", ROOT_PASSWORD); + + String peerClusterName = "peer"; + String peerUserName = "peer", peerPassword = "foo"; + + // Create local user + connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword)); + + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName); + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword); + + // ...peer = AccumuloReplicaSystem,instanceName,zookeepers + connMaster.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + peerClusterName, + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers()))); + + String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2"; + + // Create tables + connMaster.tableOperations().create(masterTable1); + String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1); + Assert.assertNotNull(masterTableId1); + + connMaster.tableOperations().create(masterTable2); + String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2); + Assert.assertNotNull(masterTableId2); + + connPeer.tableOperations().create(peerTable1); + String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1); + Assert.assertNotNull(peerTableId1); + + connPeer.tableOperations().create(peerTable2); + String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2); + Assert.assertNotNull(peerTableId2); + + // Grant write permission + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE); + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE); + + // Replicate this table to the peerClusterName in a table with the peerTableId table id + connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId1); + + connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId2); + + // Write some data to table1 + BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig()); + long masterTable1Records = 0l; + for (int rows = 0; rows < 2500; rows++) { + Mutation m = new Mutation(masterTable1 + rows); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + masterTable1Records++; + } + bw.addMutation(m); + } + + bw.close(); + + // Write some data to table2 + bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig()); + long masterTable2Records = 0l; + for (int rows = 0; rows < 2500; rows++) { + Mutation m = new Mutation(masterTable2 + rows); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + masterTable2Records++; + } + bw.addMutation(m); + } + + bw.close(); + + log.info("Wrote all data to master cluster"); + + Set filesFor1 = connMaster.replicationOperations().referencedFiles(masterTable1), filesFor2 = connMaster.replicationOperations().referencedFiles( + masterTable2); + + while (!connMaster.tableOperations().exists(ReplicationTable.NAME)) { + Thread.sleep(500); + } + + // Restart the tserver to force a close on the WAL + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + cluster.exec(TabletServer.class); + + log.info("Restarted the tserver"); + + // Read the data -- the tserver is back up and running + for (@SuppressWarnings("unused") + Entry entry : connMaster.createScanner(masterTable1, Authorizations.EMPTY)) {} + + // Wait for both tables to be replicated + log.info("Waiting for {} for {}", filesFor1, masterTable1); + connMaster.replicationOperations().drain(masterTable1, filesFor1); + + log.info("Waiting for {} for {}", filesFor2, masterTable2); + connMaster.replicationOperations().drain(masterTable2, filesFor2); + + long countTable = 0l; + for (Entry entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) { + countTable++; + Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() + .startsWith(masterTable1)); + } + + log.info("Found {} records in {}", countTable, peerTable1); + Assert.assertEquals(masterTable1Records, countTable); + + countTable = 0l; + for (Entry entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) { + countTable++; + Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() + .startsWith(masterTable2)); + } + + log.info("Found {} records in {}", countTable, peerTable2); + Assert.assertEquals(masterTable2Records, countTable); + + } finally { + peer1Cluster.stop(); + } + } + + @Test(timeout = 60 * 5000) + public void dataWasReplicatedToThePeerWithoutDrain() throws Exception { + MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), + ROOT_PASSWORD); + peerCfg.setNumTservers(1); + peerCfg.setInstanceName("peer"); + peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M"); + peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); + peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); + peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); + MiniAccumuloClusterImpl peerCluster = peerCfg.build(); + + peerCluster.start(); + + Connector connMaster = getConnector(); + Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD); + + String peerUserName = "repl"; + String peerPassword = "passwd"; + + // Create a user on the peer for replication to use + connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword)); + + String peerClusterName = "peer"; + + // ...peer = AccumuloReplicaSystem,instanceName,zookeepers + connMaster.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + peerClusterName, + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers()))); + + // Configure the credentials we should use to authenticate ourselves to the peer for replication + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName); + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword); + + String masterTable = "master", peerTable = "peer"; + + connMaster.tableOperations().create(masterTable); + String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable); + Assert.assertNotNull(masterTableId); + + connPeer.tableOperations().create(peerTable); + String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable); + Assert.assertNotNull(peerTableId); + + // Give our replication user the ability to write to the table + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE); + + // Replicate this table to the peerClusterName in a table with the peerTableId table id + connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId); + + // Write some data to table1 + BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig()); + for (int rows = 0; rows < 5000; rows++) { + Mutation m = new Mutation(Integer.toString(rows)); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + } + bw.addMutation(m); + } + + bw.close(); + + log.info("Wrote all data to master cluster"); + + Set files = connMaster.replicationOperations().referencedFiles(masterTable); + + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + + cluster.exec(TabletServer.class); + + for (@SuppressWarnings("unused") + Entry kv : connMaster.createScanner(masterTable, Authorizations.EMPTY)) {} + + for (Entry kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) { + log.debug(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); + } + + connMaster.replicationOperations().drain(masterTable, files); + + Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY); + Iterator> masterIter = master.iterator(), peerIter = peer.iterator(); + while (masterIter.hasNext() && peerIter.hasNext()) { + Entry masterEntry = masterIter.next(), peerEntry = peerIter.next(); + Assert.assertEquals(peerEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0, + masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)); + Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue()); + } + + Assert.assertFalse("Had more data to read from the master", masterIter.hasNext()); + Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext()); + + peerCluster.stop(); + } + + @Test(timeout = 60 * 5000) + public void dataReplicatedToCorrectTableWithoutDrain() throws Exception { + MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), + ROOT_PASSWORD); + peerCfg.setNumTservers(1); + peerCfg.setInstanceName("peer"); + peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M"); + peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); + peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); + peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); + MiniAccumuloClusterImpl peer1Cluster = peerCfg.build(); + + peer1Cluster.start(); + + try { + Connector connMaster = getConnector(); + Connector connPeer = peer1Cluster.getConnector("root", ROOT_PASSWORD); + + String peerClusterName = "peer"; + + String peerUserName = "repl"; + String peerPassword = "passwd"; + + // Create a user on the peer for replication to use + connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword)); + + // Configure the credentials we should use to authenticate ourselves to the peer for replication + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName); + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword); + + // ...peer = AccumuloReplicaSystem,instanceName,zookeepers + connMaster.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + peerClusterName, + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers()))); + + String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2"; + + connMaster.tableOperations().create(masterTable1); + String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1); + Assert.assertNotNull(masterTableId1); + + connMaster.tableOperations().create(masterTable2); + String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2); + Assert.assertNotNull(masterTableId2); + + connPeer.tableOperations().create(peerTable1); + String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1); + Assert.assertNotNull(peerTableId1); + + connPeer.tableOperations().create(peerTable2); + String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2); + Assert.assertNotNull(peerTableId2); + + // Give our replication user the ability to write to the tables + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE); + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE); + + // Replicate this table to the peerClusterName in a table with the peerTableId table id + connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId1); + + connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId2); + + // Write some data to table1 + BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig()); + for (int rows = 0; rows < 2500; rows++) { + Mutation m = new Mutation(masterTable1 + rows); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + } + bw.addMutation(m); + } + + bw.close(); + + // Write some data to table2 + bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig()); + for (int rows = 0; rows < 2500; rows++) { + Mutation m = new Mutation(masterTable2 + rows); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + } + bw.addMutation(m); + } + + bw.close(); + + log.info("Wrote all data to master cluster"); + + while (!connMaster.tableOperations().exists(ReplicationTable.NAME)) { + Thread.sleep(500); + } + + + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + + cluster.exec(TabletServer.class); + // connMaster.tableOperations().compact(masterTable1, null, null, true, false); + // connMaster.tableOperations().compact(masterTable2, null, null, true, false); + + // Wait until we fully replicated something + boolean fullyReplicated = false; + for (int i = 0; i < 10 && !fullyReplicated; i++) { + UtilWaitThread.sleep(2000); + + Scanner s = ReplicationTable.getScanner(connMaster); + WorkSection.limit(s); + for (Entry entry : s) { + Status status = Status.parseFrom(entry.getValue().get()); + if (StatusUtil.isFullyReplicated(status)) { + fullyReplicated |= true; + } + } + } + + Assert.assertNotEquals(0, fullyReplicated); + + long countTable = 0l; + for (Entry entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) { + countTable++; + Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() + .startsWith(masterTable1)); + } + + log.info("Found {} records in {}", countTable, peerTable1); + Assert.assertTrue(countTable > 0); + + countTable = 0l; + for (Entry entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) { + countTable++; + Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() + .startsWith(masterTable2)); + } + + log.info("Found {} records in {}", countTable, peerTable2); + Assert.assertTrue(countTable > 0); + + } finally { + peer1Cluster.stop(); + } + } +}