accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [9/9] git commit: Merge branch '1.6.1-SNAPSHOT'
Date Sat, 23 Aug 2014 00:38:00 GMT
Merge branch '1.6.1-SNAPSHOT'

Conflicts:
	core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java
	server/base/src/main/java/org/apache/accumulo/server/conf/ZooCachePropertyAccessor.java
	server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
	server/base/src/test/java/org/apache/accumulo/server/conf/ServerConfigurationFactoryTest.java
	server/base/src/test/java/org/apache/accumulo/server/conf/TableConfigurationTest.java
	server/base/src/test/java/org/apache/accumulo/server/conf/ZooCachePropertyAccessorTest.java
	test/src/test/java/org/apache/accumulo/test/functional/MasterAssignmentIT.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/21d3c88d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/21d3c88d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/21d3c88d

Branch: refs/heads/master
Commit: 21d3c88d8a2e933baae4648ac4a888e61cdc72f5
Parents: d4d5733 23c74cd
Author: Christopher Tubbs <ctubbsii@apache.org>
Authored: Fri Aug 22 20:35:48 2014 -0400
Committer: Christopher Tubbs <ctubbsii@apache.org>
Committed: Fri Aug 22 20:35:48 2014 -0400

----------------------------------------------------------------------
 .../conf/CredentialProviderFactoryShim.java     |  2 -
 .../core/conf/DefaultConfiguration.java         |  2 +-
 .../client/mapreduce/RangeInputSplitTest.java   | 31 +++++++------
 .../minicluster/MiniAccumuloCluster.java        | 10 ++---
 .../minicluster/MiniAccumuloClusterTest.java    |  1 -
 .../server/conf/NamespaceConfiguration.java     |  3 +-
 .../server/conf/ServerConfigurationFactory.java | 13 ++++--
 .../server/conf/TableConfiguration.java         |  3 +-
 .../server/conf/ZooCachePropertyAccessor.java   | 42 +++++++++++-------
 .../accumulo/server/conf/ZooConfiguration.java  | 18 ++++----
 .../accumulo/server/util/MetadataTableUtil.java |  1 -
 .../conf/ServerConfigurationFactoryTest.java    | 31 -------------
 .../server/conf/TableConfigurationTest.java     | 14 +++---
 .../conf/ZooCachePropertyAccessorTest.java      |  9 ++--
 .../server/fs/VolumeManagerImplTest.java        |  6 +--
 .../accumulo/gc/SimpleGarbageCollector.java     | 46 ++++++++++++--------
 .../monitor/servlets/ReplicationServlet.java    | 17 ++++----
 .../test/functional/MasterAssignmentIT.java     | 18 ++++----
 18 files changed, 128 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d3c88d/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d3c88d/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java
----------------------------------------------------------------------
diff --cc mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java
index 1ea0450,0000000..833e594
mode 100644,000000..100644
--- a/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java
+++ b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java
@@@ -1,126 -1,0 +1,125 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.mapreduce;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.ByteArrayOutputStream;
 +import java.io.DataInputStream;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.HashSet;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.iterators.user.SummingCombiner;
 +import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Level;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +public class RangeInputSplitTest {
 +
 +  @Test
 +  public void testSimpleWritable() throws IOException {
-     RangeInputSplit split = new RangeInputSplit("table", "1", new Range(new Key("a"), new
Key("b")), new String[]{"localhost"});
-     
++    RangeInputSplit split = new RangeInputSplit("table", "1", new Range(new Key("a"), new
Key("b")), new String[] {"localhost"});
++
 +    ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +    DataOutputStream dos = new DataOutputStream(baos);
 +    split.write(dos);
-     
++
 +    RangeInputSplit newSplit = new RangeInputSplit();
-     
++
 +    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
 +    DataInputStream dis = new DataInputStream(bais);
 +    newSplit.readFields(dis);
-     
++
 +    Assert.assertEquals(split.getTableName(), newSplit.getTableName());
 +    Assert.assertEquals(split.getTableId(), newSplit.getTableId());
 +    Assert.assertEquals(split.getRange(), newSplit.getRange());
 +    Assert.assertTrue(Arrays.equals(split.getLocations(), newSplit.getLocations()));
 +  }
 +
-   @SuppressWarnings("deprecation")
 +  @Test
 +  public void testAllFieldsWritable() throws IOException {
-     RangeInputSplit split = new RangeInputSplit("table", "1", new Range(new Key("a"), new
Key("b")), new String[]{"localhost"});
-     
++    RangeInputSplit split = new RangeInputSplit("table", "1", new Range(new Key("a"), new
Key("b")), new String[] {"localhost"});
++
 +    Set<Pair<Text,Text>> fetchedColumns = new HashSet<Pair<Text,Text>>();
-     
++
 +    fetchedColumns.add(new Pair<Text,Text>(new Text("colf1"), new Text("colq1")));
 +    fetchedColumns.add(new Pair<Text,Text>(new Text("colf2"), new Text("colq2")));
 +
 +    // Fake some iterators
 +    ArrayList<IteratorSetting> iterators = new ArrayList<IteratorSetting>();
 +    IteratorSetting setting = new IteratorSetting(50, SummingCombiner.class);
 +    setting.addOption("foo", "bar");
 +    iterators.add(setting);
 +
 +    setting = new IteratorSetting(100, WholeRowIterator.class);
 +    setting.addOption("bar", "foo");
 +    iterators.add(setting);
 +
-     split.setTable("table");
++    split.setTableName("table");
 +    split.setAuths(new Authorizations("foo"));
 +    split.setOffline(true);
 +    split.setIsolatedScan(true);
 +    split.setUsesLocalIterators(true);
 +    split.setFetchedColumns(fetchedColumns);
 +    split.setToken(new PasswordToken("password"));
 +    split.setPrincipal("root");
 +    split.setInstanceName("instance");
 +    split.setMockInstance(true);
 +    split.setZooKeepers("localhost");
 +    split.setIterators(iterators);
 +    split.setLogLevel(Level.WARN);
-     
++
 +    ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +    DataOutputStream dos = new DataOutputStream(baos);
 +    split.write(dos);
-     
++
 +    RangeInputSplit newSplit = new RangeInputSplit();
-     
++
 +    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
 +    DataInputStream dis = new DataInputStream(bais);
 +    newSplit.readFields(dis);
-     
++
 +    Assert.assertEquals(split.getRange(), newSplit.getRange());
 +    Assert.assertArrayEquals(split.getLocations(), newSplit.getLocations());
 +
-     Assert.assertEquals(split.getTable(), newSplit.getTable());
++    Assert.assertEquals(split.getTableName(), newSplit.getTableName());
 +    Assert.assertEquals(split.getAuths(), newSplit.getAuths());
 +    Assert.assertEquals(split.isOffline(), newSplit.isOffline());
 +    Assert.assertEquals(split.isIsolatedScan(), newSplit.isOffline());
 +    Assert.assertEquals(split.usesLocalIterators(), newSplit.usesLocalIterators());
 +    Assert.assertEquals(split.getFetchedColumns(), newSplit.getFetchedColumns());
 +    Assert.assertEquals(split.getToken(), newSplit.getToken());
 +    Assert.assertEquals(split.getPrincipal(), newSplit.getPrincipal());
 +    Assert.assertEquals(split.getInstanceName(), newSplit.getInstanceName());
 +    Assert.assertEquals(split.isMockInstance(), newSplit.isMockInstance());
 +    Assert.assertEquals(split.getZooKeepers(), newSplit.getZooKeepers());
 +    Assert.assertEquals(split.getIterators(), newSplit.getIterators());
 +    Assert.assertEquals(split.getLogLevel(), newSplit.getLogLevel());
 +  }
-   
++
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d3c88d/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d3c88d/server/base/src/main/java/org/apache/accumulo/server/conf/ZooCachePropertyAccessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d3c88d/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
index 09cf8df,a49ec43..0e9ee95
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
@@@ -97,10 -96,10 +97,10 @@@ public class ZooConfiguration extends A
      byte[] v = propCache.get(zPath);
      String value = null;
      if (v != null)
 -      value = new String(v, Constants.UTF8);
 +      value = new String(v, StandardCharsets.UTF_8);
      return value;
    }
-   
+ 
    @Override
    public void getProperties(Map<String,String> props, PropertyFilter filter) {
      parent.getProperties(props, filter);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d3c88d/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d3c88d/server/base/src/test/java/org/apache/accumulo/server/conf/ServerConfigurationFactoryTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d3c88d/server/base/src/test/java/org/apache/accumulo/server/conf/TableConfigurationTest.java
----------------------------------------------------------------------
diff --cc server/base/src/test/java/org/apache/accumulo/server/conf/TableConfigurationTest.java
index 990e82a,92580f3..12521f9
--- a/server/base/src/test/java/org/apache/accumulo/server/conf/TableConfigurationTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/conf/TableConfigurationTest.java
@@@ -84,8 -83,8 +84,8 @@@ public class TableConfigurationTest 
    @Test
    public void testGet_InZK() {
      Property p = Property.INSTANCE_SECRET;
--    expect(zc.get(ZooUtil.getRoot(iid) + Constants.ZTABLES + "/" + TID + Constants.ZTABLE_CONF
+ "/" + p.getKey()))
-         .andReturn("sekrit".getBytes(StandardCharsets.UTF_8));
 -        .andReturn("sekrit".getBytes(Constants.UTF8));
++    expect(zc.get(ZooUtil.getRoot(iid) + Constants.ZTABLES + "/" + TID + Constants.ZTABLE_CONF
+ "/" + p.getKey())).andReturn(
++        "sekrit".getBytes(StandardCharsets.UTF_8));
      replay(zc);
      assertEquals("sekrit", c.get(Property.INSTANCE_SECRET));
    }
@@@ -110,8 -109,8 +110,10 @@@
      children.add("foo");
      children.add("ding");
      expect(zc.getChildren(ZooUtil.getRoot(iid) + Constants.ZTABLES + "/" + TID + Constants.ZTABLE_CONF)).andReturn(children);
-     expect(zc.get(ZooUtil.getRoot(iid) + Constants.ZTABLES + "/" + TID + Constants.ZTABLE_CONF
+ "/" + "foo")).andReturn("bar".getBytes(StandardCharsets.UTF_8));
-     expect(zc.get(ZooUtil.getRoot(iid) + Constants.ZTABLES + "/" + TID + Constants.ZTABLE_CONF
+ "/" + "ding")).andReturn("dong".getBytes(StandardCharsets.UTF_8));
 -    expect(zc.get(ZooUtil.getRoot(iid) + Constants.ZTABLES + "/" + TID + Constants.ZTABLE_CONF
+ "/" + "foo")).andReturn("bar".getBytes(Constants.UTF8));
 -    expect(zc.get(ZooUtil.getRoot(iid) + Constants.ZTABLES + "/" + TID + Constants.ZTABLE_CONF
+ "/" + "ding")).andReturn("dong".getBytes(Constants.UTF8));
++    expect(zc.get(ZooUtil.getRoot(iid) + Constants.ZTABLES + "/" + TID + Constants.ZTABLE_CONF
+ "/" + "foo"))
++        .andReturn("bar".getBytes(StandardCharsets.UTF_8));
++    expect(zc.get(ZooUtil.getRoot(iid) + Constants.ZTABLES + "/" + TID + Constants.ZTABLE_CONF
+ "/" + "ding")).andReturn(
++        "dong".getBytes(StandardCharsets.UTF_8));
      replay(zc);
      c.getProperties(props, filter);
      assertEquals(2, props.size());
@@@ -135,8 -134,8 +137,8 @@@
    public void testInvalidateCache() {
      // need to do a get so the accessor is created
      Property p = Property.INSTANCE_SECRET;
--    expect(zc.get(ZooUtil.getRoot(iid) + Constants.ZTABLES + "/" + TID + Constants.ZTABLE_CONF
+ "/" + p.getKey()))
-         .andReturn("sekrit".getBytes(StandardCharsets.UTF_8));
 -        .andReturn("sekrit".getBytes(Constants.UTF8));
++    expect(zc.get(ZooUtil.getRoot(iid) + Constants.ZTABLES + "/" + TID + Constants.ZTABLE_CONF
+ "/" + p.getKey())).andReturn(
++        "sekrit".getBytes(StandardCharsets.UTF_8));
      zc.clear();
      replay(zc);
      c.get(Property.INSTANCE_SECRET);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d3c88d/server/base/src/test/java/org/apache/accumulo/server/conf/ZooCachePropertyAccessorTest.java
----------------------------------------------------------------------
diff --cc server/base/src/test/java/org/apache/accumulo/server/conf/ZooCachePropertyAccessorTest.java
index 446c2a1,1b9d8f7..742ccdd
--- a/server/base/src/test/java/org/apache/accumulo/server/conf/ZooCachePropertyAccessorTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/conf/ZooCachePropertyAccessorTest.java
@@@ -39,8 -39,9 +39,9 @@@ public class ZooCachePropertyAccessorTe
    private static final String PATH = "/root/path/to/props";
    private static final Property PROP = Property.INSTANCE_SECRET;
    private static final String KEY = PROP.getKey();
+   private static final String FULL_PATH = PATH + "/" + KEY;
    private static final String VALUE = "value";
 -  private static final byte[] VALUE_BYTES = VALUE.getBytes(Constants.UTF8);
 +  private static final byte[] VALUE_BYTES = VALUE.getBytes(StandardCharsets.UTF_8);
  
    private ZooCache zc;
    private ZooCachePropertyAccessor a;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d3c88d/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d3c88d/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
----------------------------------------------------------------------
diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
index 7a7941a,0000000..1cb6abf
mode 100644,000000..100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
@@@ -1,168 -1,0 +1,169 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.monitor.servlets;
 +
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +
 +import javax.servlet.http.HttpServletRequest;
 +import javax.servlet.http.HttpServletResponse;
 +
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.admin.TableOperations;
- import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
 +import org.apache.accumulo.core.replication.ReplicationConstants;
 +import org.apache.accumulo.core.replication.ReplicationTarget;
 +import org.apache.accumulo.core.security.Credentials;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.monitor.Monitor;
 +import org.apache.accumulo.monitor.util.Table;
 +import org.apache.accumulo.monitor.util.celltypes.NumberType;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 +import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
 +import org.apache.accumulo.server.replication.ReplicationUtil;
 +import org.apache.accumulo.server.security.SystemCredentials;
 +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 +import org.apache.zookeeper.KeeperException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +/**
-  * 
++ *
 + */
 +public class ReplicationServlet extends BasicServlet {
 +  private static final Logger log = LoggerFactory.getLogger(ReplicationServlet.class);
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  private ReplicationUtil replicationUtil;
 +
 +  public ReplicationServlet() {
 +    replicationUtil = new ReplicationUtil();
 +  }
 +
 +  @Override
 +  protected String getTitle(HttpServletRequest req) {
 +    return "Replication Overview";
 +  }
-   
++
 +  @Override
 +  protected void pageBody(HttpServletRequest req, HttpServletResponse response, StringBuilder
sb) throws Exception {
 +    final Instance inst = HdfsZooInstance.getInstance();
 +    final Credentials creds = SystemCredentials.get();
 +    final Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
 +    final Map<String,String> systemProps = conn.instanceOperations().getSystemConfiguration();
 +    final MasterMonitorInfo mmi = Monitor.getMmi();
 +
 +    // The total number of "slots" we have to replicate data
 +    int totalWorkQueueSize = replicationUtil.getMaxReplicationThreads(systemProps, mmi);
 +
 +    TableOperations tops = conn.tableOperations();
 +    if (!tops.exists(ReplicationConstants.TABLE_NAME)) {
 +      banner(sb, "", "Replication table does not yet exist");
 +      return;
 +    }
 +
 +    Table replicationStats = new Table("replicationStats", "Replication Status");
 +    replicationStats.addSortableColumn("Table");
 +    replicationStats.addSortableColumn("Peer");
 +    replicationStats.addSortableColumn("Remote Identifier");
 +    replicationStats.addSortableColumn("ReplicaSystem Type");
 +    replicationStats.addSortableColumn("Files needing replication", new NumberType<Long>(),
null);
 +
 +    Map<String,String> peers = replicationUtil.getPeers(systemProps);
 +
 +    // The total set of configured targets
 +    Set<ReplicationTarget> allConfiguredTargets = replicationUtil.getReplicationTargets(tops);
 +
 +    // Number of files per target we have to replicate
 +    Map<ReplicationTarget,Long> targetCounts = replicationUtil.getPendingReplications(conn);
 +
 +    Map<String,String> tableNameToId = tops.tableIdMap();
 +    Map<String,String> tableIdToName = replicationUtil.invert(tableNameToId);
 +
 +    long filesPendingOverAllTargets = 0l;
 +    for (ReplicationTarget configuredTarget : allConfiguredTargets) {
 +      String tableName = tableIdToName.get(configuredTarget.getSourceTableId());
 +      if (null == tableName) {
 +        log.trace("Could not determine table name from id {}", configuredTarget.getSourceTableId());
 +        continue;
 +      }
 +
 +      String replicaSystemClass = peers.get(configuredTarget.getPeerName());
 +      if (null == replicaSystemClass) {
 +        log.trace("Could not determine configured ReplicaSystem for {}", configuredTarget.getPeerName());
 +        continue;
 +      }
 +
 +      Long numFiles = targetCounts.get(configuredTarget);
 +
 +      if (null == numFiles) {
-         replicationStats.addRow(tableName, configuredTarget.getPeerName(), configuredTarget.getRemoteIdentifier(),
replicaSystemClass, 0); 
++        replicationStats.addRow(tableName, configuredTarget.getPeerName(), configuredTarget.getRemoteIdentifier(),
replicaSystemClass, 0);
 +      } else {
 +        replicationStats.addRow(tableName, configuredTarget.getPeerName(), configuredTarget.getRemoteIdentifier(),
replicaSystemClass, numFiles);
 +        filesPendingOverAllTargets += numFiles;
 +      }
 +    }
 +
 +    // Up to 2x the number of slots for replication available, WARN
 +    // More than 2x the number of slots for replication available, ERROR
-     NumberType<Long> filesPendingFormat = new NumberType<Long>(Long.valueOf(0),
Long.valueOf(2 * totalWorkQueueSize), Long.valueOf(0), Long.valueOf(4 * totalWorkQueueSize));
++    NumberType<Long> filesPendingFormat = new NumberType<Long>(Long.valueOf(0),
Long.valueOf(2 * totalWorkQueueSize), Long.valueOf(0),
++        Long.valueOf(4 * totalWorkQueueSize));
 +
 +    String utilization = filesPendingFormat.format(filesPendingOverAllTargets);
 +
 +    sb.append("<div><center><br/><span class=\"table-caption\">Total
files pending replication: ").append(utilization).append("</span></center></div>");
 +
 +    replicationStats.generate(req, sb);
 +
 +    // Make a table for the replication data in progress
 +    Table replicationInProgress = new Table("replicationInProgress", "In-Progress Replication");
 +    replicationInProgress.addSortableColumn("File");
 +    replicationInProgress.addSortableColumn("Peer");
 +    replicationInProgress.addSortableColumn("Source Table ID");
 +    replicationInProgress.addSortableColumn("Peer Identifier");
 +    replicationInProgress.addUnsortableColumn("Status");
 +
 +    // Read the files from the workqueue in zk
 +    String zkRoot = ZooUtil.getRoot(inst);
 +    final String workQueuePath = zkRoot + ReplicationConstants.ZOO_WORK_QUEUE;
 +
 +    DistributedWorkQueue workQueue = new DistributedWorkQueue(workQueuePath, new ServerConfigurationFactory(inst).getConfiguration());
 +
 +    try {
 +      for (String queueKey : workQueue.getWorkQueued()) {
 +        Entry<String,ReplicationTarget> queueKeyPair = DistributedWorkQueueWorkAssignerHelper.fromQueueKey(queueKey);
 +        String filename = queueKeyPair.getKey();
 +        ReplicationTarget target = queueKeyPair.getValue();
-   
++
 +        String path = replicationUtil.getAbsolutePath(conn, workQueuePath, queueKey);
 +        String progress = replicationUtil.getProgress(conn, path, target);
-         
++
 +        // Add a row in the table
-         replicationInProgress.addRow(null == path ? ".../" + filename : path, target.getPeerName(),
target.getSourceTableId(), target.getRemoteIdentifier(), progress);
++        replicationInProgress.addRow(null == path ? ".../" + filename : path, target.getPeerName(),
target.getSourceTableId(), target.getRemoteIdentifier(),
++            progress);
 +      }
 +    } catch (KeeperException | InterruptedException e) {
 +      log.warn("Could not calculate replication in progress", e);
 +    }
 +
 +    replicationInProgress.generate(req, sb);
 +  }
 +}


Mime
View raw message