Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AC705200CFA for ; Fri, 28 Jul 2017 01:32:23 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A926E16BD6E; Thu, 27 Jul 2017 23:32:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 85E2416BD69 for ; Fri, 28 Jul 2017 01:32:22 +0200 (CEST) Received: (qmail 65359 invoked by uid 500); 27 Jul 2017 23:32:21 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 65271 invoked by uid 99); 27 Jul 2017 23:32:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Jul 2017 23:32:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 85610F32EF; Thu, 27 Jul 2017 23:32:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Thu, 27 Jul 2017 23:32:21 -0000 Message-Id: <9e52292e774a4df0b6f36580a96027cb@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [4/6] accumulo git commit: Merge branch '1.7' into 1.8 archived-at: Thu, 27 Jul 2017 23:32:23 -0000 Merge branch '1.7' into 1.8 Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3f01418e Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3f01418e Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3f01418e Branch: refs/heads/master Commit: 3f01418e6ae134ef707f333088ee2ba0902b4a2d Parents: 018c7fe f08c58c Author: Josh Elser Authored: Thu Jul 27 19:01:08 2017 -0400 Committer: Josh Elser Committed: Thu Jul 27 19:02:10 2017 -0400 ---------------------------------------------------------------------- .../file/streams/BoundedRangeFileInputStream.java | 16 +--------------- .../java/org/apache/accumulo/core/util/Jar.java | 3 +-- .../accumulo/monitor/servlets/BasicServlet.java | 10 +++++++++- .../accumulo/monitor/servlets/DefaultServlet.java | 6 +++++- .../accumulo/monitor/servlets/OperationServlet.java | 15 ++++++++++----- 5 files changed, 26 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f01418e/core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java index a033ad4,0000000..95ef4ea mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java +++ b/core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java @@@ -1,157 -1,0 +1,143 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.accumulo.core.file.streams; + +import java.io.IOException; +import java.io.InputStream; - import java.security.AccessController; - import java.security.PrivilegedActionException; - import java.security.PrivilegedExceptionAction; + +import org.apache.hadoop.fs.Seekable; + +/** + * BoundedRangeFIleInputStream abstracts a contiguous region of a Hadoop FSDataInputStream as a regular input stream. One can create multiple + * BoundedRangeFileInputStream on top of the same FSDataInputStream and they would not interfere with each other. + */ +public class BoundedRangeFileInputStream extends InputStream { + + private volatile boolean closed = false; + private final InputStream in; + private long pos; + private long end; + private long mark; + private final byte[] oneByte = new byte[1]; + + /** + * Constructor + * + * @param in + * The FSDataInputStream we connect to. + * @param offset + * Beginning offset of the region. + * @param length + * Length of the region. + * + * The actual length of the region may be smaller if (off_begin + length) goes beyond the end of FS input stream. + */ + public BoundedRangeFileInputStream(StreamType in, long offset, long length) { + if (offset < 0 || length < 0) { + throw new IndexOutOfBoundsException("Invalid offset/length: " + offset + "/" + length); + } + + this.in = in; + this.pos = offset; + this.end = offset + length; + this.mark = -1; + } + + @Override + public int available() throws IOException { + return (int) (end - pos); + } + + @Override + public int read() throws IOException { + int ret = read(oneByte); + if (ret == 1) + return oneByte[0] & 0xff; + return -1; + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(final byte[] b, final int off, int len) throws IOException { + if ((off | len | (off + len) | (b.length - (off + len))) < 0) { + throw new IndexOutOfBoundsException(); + } + + final int n = (int) Math.min(Integer.MAX_VALUE, Math.min(len, (end - pos))); + if (n == 0) + return -1; + Integer ret = 0; + synchronized (in) { + // ensuring we are not closed which would be followed by someone else reusing the decompressor + if (closed) { + throw new IOException("Stream closed"); + } + ((Seekable) in).seek(pos); - try { - ret = AccessController.doPrivileged(new PrivilegedExceptionAction() { - @Override - public Integer run() throws IOException { - int ret = 0; - ret = in.read(b, off, n); - return ret; - } - }); - } catch (PrivilegedActionException e) { - throw (IOException) e.getException(); - } ++ ret = in.read(b, off, n); + } + if (ret < 0) { + end = pos; + return -1; + } + pos += ret; + return ret; + } + + @Override + /* + * We may skip beyond the end of the file. + */ + public long skip(long n) throws IOException { + long len = Math.min(n, end - pos); + pos += len; + return len; + } + + @Override + public void mark(int readlimit) { + mark = pos; + } + + @Override + public void reset() throws IOException { + if (mark < 0) + throw new IOException("Resetting to invalid mark"); + pos = mark; + } + + @Override + public boolean markSupported() { + return true; + } + + @Override + public void close() { + // Synchronize on the FSDataInputStream to ensure we are blocked if in the read method: + // Once this close completes, the underlying decompression stream may be returned to + // the pool and subsequently used. Turns out this is a problem if currently using it to read. + if (!closed) { + synchronized (in) { + // Invalidate the state of the stream. + closed = true; + } + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f01418e/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f01418e/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java ---------------------------------------------------------------------- diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java index 383d7bc,0f83b68..1e720ec --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java @@@ -39,6 -42,13 +39,8 @@@ import org.apache.accumulo.monitor.Moni import org.apache.accumulo.monitor.ZooKeeperStatus; import org.apache.accumulo.monitor.ZooKeeperStatus.ZooKeeperState; import org.apache.accumulo.monitor.util.celltypes.NumberType; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.fs.VolumeManagerImpl; -import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; public class DefaultServlet extends BasicServlet {