accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [36/51] [abbrv] git commit: ACCUMULO-378 More rb changes.
Date Sat, 14 Jun 2014 04:55:36 GMT
ACCUMULO-378 More rb changes.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ad4ea6b2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ad4ea6b2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ad4ea6b2

Branch: refs/heads/master
Commit: ad4ea6b248f49d5c79390d2e99f9804c0606ded3
Parents: 856f235
Author: Josh Elser <elserj@apache.org>
Authored: Thu Jun 5 01:31:40 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Thu Jun 5 01:31:40 2014 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/Constants.java     |  5 ----
 .../client/admin/ReplicationOperations.java     | 21 ++++++++++++---
 .../core/client/impl/ReplicationClient.java     |  1 -
 .../client/impl/ReplicationOperationsImpl.java  |  8 ++++++
 .../core/replication/ReplicationConstants.java  | 28 ++++++++++++++++++++
 .../apache/accumulo/server/init/Initialize.java |  5 ++--
 .../DistributedWorkQueueWorkAssigner.java       |  4 +--
 .../MasterReplicationCoordinator.java           |  4 +--
 .../replication/SequentialWorkAssigner.java     |  4 +--
 .../replication/UnorderedWorkAssigner.java      |  4 +--
 .../replication/SequentialWorkAssignerTest.java |  6 ++---
 .../replication/UnorderedWorkAssignerTest.java  |  5 ++--
 .../monitor/servlets/ReplicationServlet.java    |  4 +--
 .../apache/accumulo/tserver/TabletServer.java   |  3 ++-
 .../tserver/replication/ReplicationWorker.java  |  6 ++---
 .../replication/MultiTserverReplicationIT.java  |  3 ++-
 16 files changed, 80 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/core/src/main/java/org/apache/accumulo/core/Constants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java
index 795d6fe..3edba81 100644
--- a/core/src/main/java/org/apache/accumulo/core/Constants.java
+++ b/core/src/main/java/org/apache/accumulo/core/Constants.java
@@ -114,9 +114,4 @@ public class Constants {
   public static final String[] PATH_PROPERTY_ENV_VARS = new String[] {"ACCUMULO_HOME", "ACCUMULO_CONF_DIR"};
 
   public static final String HDFS_TABLES_DIR = "/tables";
-
-  // Constants for replication information in zookeeper
-  public static final String ZREPLICATION = "/replication";
-  public static final String ZREPLICATION_WORK_QUEUE = ZREPLICATION + "/workqueue";
-  public static final String ZREPLICATION_TSERVERS = ZREPLICATION + "/tservers";
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
b/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
index 5873f73..9267f67 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
@@ -36,6 +36,13 @@ public interface ReplicationOperations {
    * @param name Name of the cluster, used for configuring replication on tables
    * @param system Type of system to be replicated to
    */
+  public void addPeer(String name, Class<? extends ReplicaSystem> system) throws AccumuloException,
AccumuloSecurityException, PeerExistsException;
+
+  /**
+   * Defines a cluster with the given name using the given {@link ReplicaSystem}.
+   * @param name Name of the cluster, used for configuring replication on tables
+   * @param system Type of system to be replicated to
+   */
   public void addPeer(String name, ReplicaSystem system) throws AccumuloException, AccumuloSecurityException,
PeerExistsException;
 
   /**
@@ -54,7 +61,8 @@ public interface ReplicationOperations {
   public void removePeer(String name) throws AccumuloException, AccumuloSecurityException,
PeerNotFoundException;
 
   /**
-   * Waits for a table to be fully replicated.
+   * Waits for a table to be fully replicated, given the state of files pending replication
for the provided table
+   * at the point in time which this method is invoked.
    * @param tableName The table to wait for
    * @throws AccumuloException
    * @throws AccumuloSecurityException
@@ -62,7 +70,9 @@ public interface ReplicationOperations {
   public void drain(String tableName) throws AccumuloException, AccumuloSecurityException,
TableNotFoundException;
 
   /**
-   * Waits for a table to be fully replicated as determined by the provided tables.
+   * Given the provided set of files that are pending replication for a table, wait for those
+   * files to be fully replicated to all configured peers. This allows for the accurate calculation
+   * when a table, at a given point in time, has been fully replicated.
    * @param tableName The table to wait for
    * @throws AccumuloException
    * @throws AccumuloSecurityException
@@ -70,7 +80,12 @@ public interface ReplicationOperations {
   public void drain(String tableName, Set<String> files) throws AccumuloException,
AccumuloSecurityException, TableNotFoundException;
 
   /**
-   * Gets all of the referenced files for a table.
+   * Gets all of the referenced files for a table from the metadata table. The result of
this method
+   * is intended to be directly supplied to {@link #drain(String, Set)}. This helps determine
when all
+   * data from a given point in time has been fully replicated.
+   * <p>
+   * This also allows callers to get the {@link Set} of files for a table at some time, and
later provide that
+   * {@link Set} to {@link #drain(String,Set)} to wait for all of those files to be replicated.
    * @param tableName
    * @throws TableNotFoundException
    */

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
index 13c027a..8f15839 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
@@ -18,7 +18,6 @@ package org.apache.accumulo.core.client.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import java.net.UnknownHostException;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
index 51a5367..8c50ecc 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
@@ -77,6 +77,14 @@ public class ReplicationOperationsImpl implements ReplicationOperations
{
   }
 
   @Override
+  public void addPeer(String name, Class<? extends ReplicaSystem> system) throws AccumuloException,
AccumuloSecurityException, PeerExistsException {
+    checkNotNull(name);
+    checkNotNull(system);
+
+    addPeer(name, system.getName());
+  }
+
+  @Override
   public void addPeer(String name, ReplicaSystem system) throws AccumuloException, AccumuloSecurityException,
PeerExistsException {
     checkNotNull(name);
     checkNotNull(system);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java
b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java
new file mode 100644
index 0000000..517920c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.replication;
+
+/**
+ * 
+ */
+public class ReplicationConstants {
+  // Constants for replication information in zookeeper
+  public static final String ZOO_BASE = "/replication";
+  public static final String ZOO_WORK_QUEUE = ZOO_BASE + "/workqueue";
+  public static final String ZOO_TSERVERS = ZOO_BASE + "/tservers";
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 1d9fb86..78fa4e7 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -50,6 +50,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.replication.ReplicationConstants;
 import org.apache.accumulo.core.security.SecurityUtil;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.volume.VolumeConfiguration;
@@ -480,8 +481,8 @@ public class Initialize {
     zoo.putPersistentData(zkInstanceRoot + Constants.ZRECOVERY, ZERO_CHAR_ARRAY, NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + Constants.ZMONITOR, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + Constants.ZMONITOR_LOCK, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
-    zoo.putPersistentData(zkInstanceRoot + Constants.ZREPLICATION, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
-    zoo.putPersistentData(zkInstanceRoot + Constants.ZREPLICATION_TSERVERS, EMPTY_BYTE_ARRAY,
NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + ReplicationConstants.ZOO_BASE, EMPTY_BYTE_ARRAY,
NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + ReplicationConstants.ZOO_TSERVERS, EMPTY_BYTE_ARRAY,
NodeExistsPolicy.FAIL);
   }
 
   private static String getInstanceNamePath(Opts opts) throws IOException, KeeperException,
InterruptedException {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
index 4815305..234b153 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
@@ -20,7 +20,6 @@ import java.util.Collection;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
@@ -30,6 +29,7 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationConstants;
 import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTarget;
@@ -114,7 +114,7 @@ public abstract class DistributedWorkQueueWorkAssigner implements WorkAssigner
{
    * @param conf
    */
   protected void initializeWorkQueue(AccumuloConfiguration conf) {
-    workQueue = new DistributedWorkQueue(ZooUtil.getRoot(conn.getInstance()) + Constants.ZREPLICATION_WORK_QUEUE,
conf);
+    workQueue = new DistributedWorkQueue(ZooUtil.getRoot(conn.getInstance()) + ReplicationConstants.ZOO_WORK_QUEUE,
conf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
b/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
index 974aaa9..11c04be 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
@@ -21,9 +21,9 @@ import java.util.Iterator;
 import java.util.Random;
 import java.util.Set;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.replication.ReplicationConstants;
 import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
 import org.apache.accumulo.core.replication.thrift.ReplicationCoordinatorErrorCode;
 import org.apache.accumulo.core.replication.thrift.ReplicationCoordinatorException;
@@ -83,7 +83,7 @@ public class MasterReplicationCoordinator implements ReplicationCoordinator.Ifac
     TServerInstance tserver = getRandomTServer(tservers, rand.nextInt(tservers.size()));
     String replServiceAddr;
     try {
-      replServiceAddr = new String(reader.getData(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_TSERVERS
+ "/" + tserver.hostPort(), null), StandardCharsets.UTF_8);
+      replServiceAddr = new String(reader.getData(ZooUtil.getRoot(inst) + ReplicationConstants.ZOO_TSERVERS
+ "/" + tserver.hostPort(), null), StandardCharsets.UTF_8);
     } catch (KeeperException | InterruptedException e) {
       log.error("Could not fetch repliation service port for tserver", e);
       throw new ReplicationCoordinatorException(ReplicationCoordinatorErrorCode.SERVICE_CONFIGURATION_UNAVAILABLE,

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
index e56763e..4b2936c 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
@@ -24,9 +24,9 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.replication.ReplicationConstants;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
@@ -137,7 +137,7 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner
{
         // tableID -> workKey
         Entry<String,String> entry = iter.next();
         // Null equates to the work for this target was finished
-        if (null == zooCache.get(ZooUtil.getRoot(instanceId) + Constants.ZREPLICATION_WORK_QUEUE
+ "/" + entry.getValue())) {
+        if (null == zooCache.get(ZooUtil.getRoot(instanceId) + ReplicationConstants.ZOO_WORK_QUEUE
+ "/" + entry.getValue())) {
           log.debug("Removing {} from work assignment state", entry.getValue());
           iter.remove();
           elementsRemoved++;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java
b/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java
index b6706ef..9042e2d 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java
@@ -20,9 +20,9 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.replication.ReplicationConstants;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
@@ -133,7 +133,7 @@ public class UnorderedWorkAssigner extends DistributedWorkQueueWorkAssigner
{
     while (work.hasNext()) {
       String filename = work.next();
       // Null equates to the work was finished
-      if (null == zooCache.get(ZooUtil.getRoot(instanceId) + Constants.ZREPLICATION_WORK_QUEUE
+ "/" + filename)) {
+      if (null == zooCache.get(ZooUtil.getRoot(instanceId) + ReplicationConstants.ZOO_WORK_QUEUE
+ "/" + filename)) {
         work.remove();
       }
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
index e7ff4ca..3d96ea1 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
@@ -26,7 +26,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
@@ -35,6 +34,7 @@ import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationConstants;
 import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTarget;
@@ -329,11 +329,11 @@ public class SequentialWorkAssignerTest {
 
     // file1 replicated
     expect(
-        zooCache.get(ZooUtil.getRoot("instance") + Constants.ZREPLICATION_WORK_QUEUE + "/"
+        zooCache.get(ZooUtil.getRoot("instance") + ReplicationConstants.ZOO_WORK_QUEUE +
"/"
             + DistributedWorkQueueWorkAssignerHelper.getQueueKey("file1", new ReplicationTarget("cluster1",
"1", "1")))).andReturn(null);
     // file2 still needs to replicate
     expect(
-        zooCache.get(ZooUtil.getRoot("instance") + Constants.ZREPLICATION_WORK_QUEUE + "/"
+        zooCache.get(ZooUtil.getRoot("instance") + ReplicationConstants.ZOO_WORK_QUEUE +
"/"
             + DistributedWorkQueueWorkAssignerHelper.getQueueKey("file2", new ReplicationTarget("cluster1",
"2", "2")))).andReturn(new byte[0]);
 
     replay(workQueue, zooCache, conn, inst);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
b/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
index 0c9384e..2199808 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationConstants;
 import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTarget;
@@ -238,8 +239,8 @@ public class UnorderedWorkAssignerTest {
 
     expect(conn.getInstance()).andReturn(inst);
     expect(inst.getInstanceID()).andReturn("id");
-    expect(cache.get(Constants.ZROOT + "/id" + Constants.ZREPLICATION_WORK_QUEUE + "/wal1")).andReturn(null);
-    expect(cache.get(Constants.ZROOT + "/id" + Constants.ZREPLICATION_WORK_QUEUE + "/wal2")).andReturn(null);
+    expect(cache.get(Constants.ZROOT + "/id" + ReplicationConstants.ZOO_WORK_QUEUE + "/wal1")).andReturn(null);
+    expect(cache.get(Constants.ZROOT + "/id" + ReplicationConstants.ZOO_WORK_QUEUE + "/wal2")).andReturn(null);
 
     replay(cache, inst, conn);
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
index ab83b4a..9936946 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
@@ -27,7 +27,6 @@ import java.util.Set;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
@@ -42,6 +41,7 @@ import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.replication.ReplicationConstants;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
@@ -207,7 +207,7 @@ public class ReplicationServlet extends BasicServlet {
 
     // Read the files from the workqueue in zk
     String zkRoot = ZooUtil.getRoot(inst);
-    final String workQueuePath = zkRoot + Constants.ZREPLICATION_WORK_QUEUE;
+    final String workQueuePath = zkRoot + ReplicationConstants.ZOO_WORK_QUEUE;
 
     DistributedWorkQueue workQueue = new DistributedWorkQueue(workQueuePath, ServerConfiguration.getSystemConfiguration(inst));
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index dd3c16e..e1a62de 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -111,6 +111,7 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.replication.ReplicationConstants;
 import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.SecurityUtil;
@@ -2893,7 +2894,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     try {
       // The replication service is unique to the thrift service for a tserver, not just
a host.
       // Advertise the host and port for replication service given the host and port for
the tserver.
-      ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(getInstance()) + Constants.ZREPLICATION_TSERVERS
+ "/" + clientAddress.toString(),
+      ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(getInstance()) + ReplicationConstants.ZOO_TSERVERS
+ "/" + clientAddress.toString(),
           sp.address.toString().getBytes(StandardCharsets.UTF_8), NodeExistsPolicy.OVERWRITE);
     } catch (Exception e) {
       log.error("Could not advertise replication service port", e);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
index a223511..20da0d6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
@@ -18,11 +18,11 @@ package org.apache.accumulo.tserver.replication;
 
 import java.util.concurrent.ThreadPoolExecutor;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.replication.ReplicationConstants;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.security.SystemCredentials;
@@ -63,10 +63,10 @@ public class ReplicationWorker implements Runnable {
       DistributedWorkQueue workQueue;
       if (defaultDelay != delay && defaultPeriod != period) {
         log.debug("Configuration DistributedWorkQueue with delay and period of {} and {}",
delay, period);
-        workQueue = new DistributedWorkQueue(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_WORK_QUEUE,
conf, delay, period);
+        workQueue = new DistributedWorkQueue(ZooUtil.getRoot(inst) + ReplicationConstants.ZOO_WORK_QUEUE,
conf, delay, period);
       } else {
         log.debug("Configuring DistributedWorkQueue with default delay and period");
-        workQueue = new DistributedWorkQueue(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_WORK_QUEUE,
conf);
+        workQueue = new DistributedWorkQueue(ZooUtil.getRoot(inst) + ReplicationConstants.ZOO_WORK_QUEUE,
conf);
       }
 
       workQueue.startProcessing(new ReplicationProcessor(inst, conf, fs, SystemCredentials.get()),
executor);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
b/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
index 96e8b52..0f4cf5b 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
@@ -24,6 +24,7 @@ import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.replication.ReplicationConstants;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.ZooReader;
@@ -68,7 +69,7 @@ public class MultiTserverReplicationIT extends ConfigurableMacIT {
 
     for (String tserver : tserverHost) {
       try {
-        byte[] portData = zreader.getData(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_TSERVERS
+ "/" + tserver, null);
+        byte[] portData = zreader.getData(ZooUtil.getRoot(inst) + ReplicationConstants.ZOO_TSERVERS
+ "/" + tserver, null);
         HostAndPort replAddress = HostAndPort.fromString(new String(portData, StandardCharsets.UTF_8));
         replicationServices.add(replAddress);
       } catch (Exception e) {


Mime
View raw message