Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 21F09185CF for ; Sat, 9 Jan 2016 03:38:07 +0000 (UTC) Received: (qmail 39686 invoked by uid 500); 9 Jan 2016 03:38:07 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 39589 invoked by uid 500); 9 Jan 2016 03:38:06 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 38738 invoked by uid 99); 9 Jan 2016 03:38:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 09 Jan 2016 03:38:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 30FAAE3853; Sat, 9 Jan 2016 03:38:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctubbsii@apache.org To: commits@accumulo.apache.org Date: Sat, 09 Jan 2016 03:38:18 -0000 Message-Id: <7bb4386a7fa5426dbf77dcea46b0f011@git.apache.org> In-Reply-To: <95f169c243204e0dabe939b2e81829d7@git.apache.org> References: <95f169c243204e0dabe939b2e81829d7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/19] accumulo git commit: Merge branch 'javadoc-jdk8-1.6' into javadoc-jdk8-1.7 http://git-wip-us.apache.org/repos/asf/accumulo/blob/6becfbd3/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java ---------------------------------------------------------------------- diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java index 01bd23a,0000000..bf582c7 mode 100644,000000..100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java @@@ -1,167 -1,0 +1,167 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.monitor.servlets; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.master.thrift.MasterMonitorInfo; +import org.apache.accumulo.core.replication.ReplicationConstants; +import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.core.replication.ReplicationTarget; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.monitor.Monitor; +import org.apache.accumulo.monitor.util.Table; +import org.apache.accumulo.monitor.util.celltypes.NumberType; +import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper; +import org.apache.accumulo.server.replication.ReplicationUtil; +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class ReplicationServlet extends BasicServlet { + private static final Logger log = LoggerFactory.getLogger(ReplicationServlet.class); + + private static final long serialVersionUID = 1L; + + // transient because it's not serializable and servlets are serializable + private transient volatile ReplicationUtil replicationUtil = null; + + private synchronized ReplicationUtil getReplicationUtil() { + // make transient replicationUtil available as needed + if (replicationUtil == null) { + replicationUtil = new ReplicationUtil(Monitor.getContext()); + } + return replicationUtil; + } + + @Override + protected String getTitle(HttpServletRequest req) { + return "Replication Overview"; + } + + @Override + protected void pageBody(HttpServletRequest req, HttpServletResponse response, StringBuilder sb) throws Exception { + final Connector conn = Monitor.getContext().getConnector(); + final MasterMonitorInfo mmi = Monitor.getMmi(); + + // The total number of "slots" we have to replicate data + int totalWorkQueueSize = getReplicationUtil().getMaxReplicationThreads(mmi); + + TableOperations tops = conn.tableOperations(); + if (!ReplicationTable.isOnline(conn)) { + banner(sb, "", "Replication table is offline"); + return; + } + + Table replicationStats = new Table("replicationStats", "Replication Status"); + replicationStats.addSortableColumn("Table"); + replicationStats.addSortableColumn("Peer"); + replicationStats.addSortableColumn("Remote Identifier"); + replicationStats.addSortableColumn("ReplicaSystem Type"); + replicationStats.addSortableColumn("Files needing replication", new NumberType(), null); + + Map peers = getReplicationUtil().getPeers(); + + // The total set of configured targets + Set allConfiguredTargets = getReplicationUtil().getReplicationTargets(); + + // Number of files per target we have to replicate + Map targetCounts = getReplicationUtil().getPendingReplications(); + + Map tableNameToId = tops.tableIdMap(); + Map tableIdToName = getReplicationUtil().invert(tableNameToId); + + long filesPendingOverAllTargets = 0l; + for (ReplicationTarget configuredTarget : allConfiguredTargets) { + String tableName = tableIdToName.get(configuredTarget.getSourceTableId()); + if (null == tableName) { + log.trace("Could not determine table name from id {}", configuredTarget.getSourceTableId()); + continue; + } + + String replicaSystemClass = peers.get(configuredTarget.getPeerName()); + if (null == replicaSystemClass) { + log.trace("Could not determine configured ReplicaSystem for {}", configuredTarget.getPeerName()); + continue; + } + + Long numFiles = targetCounts.get(configuredTarget); + + if (null == numFiles) { + replicationStats.addRow(tableName, configuredTarget.getPeerName(), configuredTarget.getRemoteIdentifier(), replicaSystemClass, 0); + } else { + replicationStats.addRow(tableName, configuredTarget.getPeerName(), configuredTarget.getRemoteIdentifier(), replicaSystemClass, numFiles); + filesPendingOverAllTargets += numFiles; + } + } + + // Up to 2x the number of slots for replication available, WARN + // More than 2x the number of slots for replication available, ERROR + NumberType filesPendingFormat = new NumberType(Long.valueOf(0), Long.valueOf(2 * totalWorkQueueSize), Long.valueOf(0), + Long.valueOf(4 * totalWorkQueueSize)); + + String utilization = filesPendingFormat.format(filesPendingOverAllTargets); + - sb.append("

Total files pending replication: ").append(utilization).append("
"); ++ sb.append("

Total files pending replication: ").append(utilization).append("
"); + + replicationStats.generate(req, sb); + + // Make a table for the replication data in progress + Table replicationInProgress = new Table("replicationInProgress", "In-Progress Replication"); + replicationInProgress.addSortableColumn("File"); + replicationInProgress.addSortableColumn("Peer"); + replicationInProgress.addSortableColumn("Source Table ID"); + replicationInProgress.addSortableColumn("Peer Identifier"); + replicationInProgress.addUnsortableColumn("Status"); + + // Read the files from the workqueue in zk + String zkRoot = ZooUtil.getRoot(Monitor.getContext().getInstance()); + final String workQueuePath = zkRoot + ReplicationConstants.ZOO_WORK_QUEUE; + + DistributedWorkQueue workQueue = new DistributedWorkQueue(workQueuePath, Monitor.getContext().getConfiguration()); + + try { + for (String queueKey : workQueue.getWorkQueued()) { + Entry queueKeyPair = DistributedWorkQueueWorkAssignerHelper.fromQueueKey(queueKey); + String filename = queueKeyPair.getKey(); + ReplicationTarget target = queueKeyPair.getValue(); + + String path = getReplicationUtil().getAbsolutePath(conn, workQueuePath, queueKey); + String progress = getReplicationUtil().getProgress(conn, path, target); + + // Add a row in the table + replicationInProgress.addRow(null == path ? ".../" + filename : path, target.getPeerName(), target.getSourceTableId(), target.getRemoteIdentifier(), + progress); + } + } catch (KeeperException | InterruptedException e) { + log.warn("Could not calculate replication in progress", e); + } + + replicationInProgress.generate(req, sb); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/6becfbd3/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TablesServlet.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/6becfbd3/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java index 40cb604,fa0b68b..0e0089a --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java @@@ -43,12 -43,6 +43,12 @@@ public abstract class CompactionStrateg * {@link #getCompactionPlan(MajorCompactionRequest)}) that it does not need to. Any state stored during shouldCompact will no longer exist when * {@link #gatherInformation(MajorCompactionRequest)} and {@link #getCompactionPlan(MajorCompactionRequest)} are called. * - *

++ *

+ * Called while holding the tablet lock, so it should not be doing any blocking. + * - *

++ *

+ * Since no blocking should be done in this method, then its unexpected that this method will throw IOException. However since its in the API, it can not be + * easily removed. */ public abstract boolean shouldCompact(MajorCompactionRequest request) throws IOException; @@@ -64,10 -58,6 +64,10 @@@ /** * Get the plan for compacting a tablets files. Called while holding the tablet lock, so it should not be doing any blocking. * - *

++ *

+ * Since no blocking should be done in this method, then its unexpected that this method will throw IOException. However since its in the API, it can not be + * easily removed. + * * @param request * basic details about the tablet * @return the plan for a major compaction, or null to cancel the compaction. http://git-wip-us.apache.org/repos/asf/accumulo/blob/6becfbd3/test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java index 6afcdf5,0000000..fd19658 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java +++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java @@@ -1,38 -1,0 +1,39 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * A Merkle tree is a hash tree and can be used to evaluate equality over large + * files with the ability to ascertain what portions of the files differ. Each leaf of the Merkle tree is some hash of a + * portion of the file, with each leaf corresponding to some "range" within the source file. As such, if all leaves are + * considered as ranges of the source file, the "sum" of all leaves creates a contiguous range over the entire file. - *

++ *

+ * The parent of any nodes (typically, a binary tree; however this is not required) is the concatenation of the hashes of + * the children. We can construct a full tree by walking up the tree, creating parents from children, until we have a root + * node. To check equality of two files that each have a merkle tree built, we can very easily compare the value of at the + * root of the Merkle tree to know whether or not the files are the same. - *

++ *

+ * Additionally, in the situation where we have two files with we expect to be the same but are not, we can walk back down + * the tree, finding subtrees that are equal and subtrees that are not. Subtrees that are equal correspond to portions of + * the files which are identical, where subtrees that are not equal correspond to discrepancies between the two files. - *

++ *

+ * We can apply this concept to Accumulo, treating a table as a file, and ranges within a file as an Accumulo Range. We can + * then compute the hashes over each of these Ranges and compute the entire Merkle tree to determine if two tables are + * equivalent. + * + * @since 1.7.0 + */ - package org.apache.accumulo.test.replication.merkle; ++package org.apache.accumulo.test.replication.merkle; ++ http://git-wip-us.apache.org/repos/asf/accumulo/blob/6becfbd3/test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java index 5fa9a5f,0000000..769241e mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java +++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java @@@ -1,149 -1,0 +1,149 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication.merkle.skvi; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Collection; +import java.util.Map; + +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; + +/** + * {@link SortedKeyValueIterator} which attempts to compute a hash over some range of Key-Value pairs. - *

++ *

+ * For the purposes of constructing a Merkle tree, this class will only generate a meaningful result if the (Batch)Scanner will compute a single digest over a + * Range. If the (Batch)Scanner stops and restarts in the middle of a session, incorrect values will be returned and the merkle tree will be invalid. + */ +public class DigestIterator implements SortedKeyValueIterator { + public static final String HASH_NAME_KEY = "hash.name"; + + private MessageDigest digest; + private Key topKey; + private Value topValue; + private SortedKeyValueIterator source; + + @Override + public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException { + String hashName = options.get(HASH_NAME_KEY); + if (null == hashName) { + throw new IOException(HASH_NAME_KEY + " must be provided as option"); + } + + try { + this.digest = MessageDigest.getInstance(hashName); + } catch (NoSuchAlgorithmException e) { + throw new IOException(e); + } + + this.topKey = null; + this.topValue = null; + this.source = source; + } + + @Override + public boolean hasTop() { + return null != topKey; + } + + @Override + public void next() throws IOException { + // We can't call next() if we already consumed it all + if (!this.source.hasTop()) { + this.topKey = null; + this.topValue = null; + return; + } + + this.source.next(); + + consume(); + } + + @Override + public void seek(Range range, Collection columnFamilies, boolean inclusive) throws IOException { + this.source.seek(range, columnFamilies, inclusive); + + consume(); + } + + protected void consume() throws IOException { + digest.reset(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + + if (!this.source.hasTop()) { + this.topKey = null; + this.topValue = null; + + return; + } + + Key lastKeySeen = null; + while (this.source.hasTop()) { + baos.reset(); + + Key currentKey = this.source.getTopKey(); + lastKeySeen = currentKey; + + currentKey.write(dos); + this.source.getTopValue().write(dos); + + digest.update(baos.toByteArray()); + + this.source.next(); + } + + this.topKey = lastKeySeen; + this.topValue = new Value(digest.digest()); + } + + @Override + public Key getTopKey() { + return topKey; + } + + @Override + public Value getTopValue() { + return topValue; + } + + @Override + public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { + DigestIterator copy = new DigestIterator(); + try { + copy.digest = MessageDigest.getInstance(digest.getAlgorithm()); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + + copy.topKey = this.topKey; + copy.topValue = this.topValue; + copy.source = this.source.deepCopy(env); + + return copy; + } + +}