Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BDAE819EC8 for ; Fri, 1 Apr 2016 17:40:04 +0000 (UTC) Received: (qmail 70340 invoked by uid 500); 1 Apr 2016 17:40:04 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 70298 invoked by uid 500); 1 Apr 2016 17:40:04 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 70274 invoked by uid 99); 1 Apr 2016 17:40:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Apr 2016 17:40:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 808B8DFC74; Fri, 1 Apr 2016 17:40:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kturner@apache.org To: commits@accumulo.apache.org Date: Fri, 01 Apr 2016 17:40:04 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] accumulo git commit: ACCUMULO-4164 Avoid copying rfile index when in cache. Avoid sync when deserializing index. Repository: accumulo Updated Branches: refs/heads/master 0dd1d6a51 -> 574234293 ACCUMULO-4164 Avoid copying rfile index when in cache. Avoid sync when deserializing index. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2afc3dc8 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2afc3dc8 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2afc3dc8 Branch: refs/heads/master Commit: 2afc3dc87d158667da72a8959726bce62de5dee6 Parents: 41e002d Author: Keith Turner Authored: Fri Apr 1 08:43:48 2016 -0400 Committer: Keith Turner Committed: Fri Apr 1 12:56:40 2016 -0400 ---------------------------------------------------------------------- .../core/file/blockfile/ABlockReader.java | 2 + .../file/blockfile/impl/CachableBlockFile.java | 45 ++-- .../impl/SeekableByteArrayInputStream.java | 141 ++++++++++++ .../core/file/rfile/MultiLevelIndex.java | 219 +++++++++++++------ 4 files changed, 311 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/2afc3dc8/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java index 8df2469..9d7a01a 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java @@ -51,4 +51,6 @@ public interface ABlockReader extends DataInput { int getPosition(); T getIndex(Class clazz); + + byte[] getBuffer(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/2afc3dc8/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java index 4d65c9f..7496202 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java @@ -70,21 +70,25 @@ public class CachableBlockFile { _bc = new BCFile.Writer(fsout, compressAlgor, conf, false, accumuloConfiguration); } + @Override public ABlockWriter prepareMetaBlock(String name) throws IOException { _bw = new BlockWrite(_bc.prepareMetaBlock(name)); return _bw; } + @Override public ABlockWriter prepareMetaBlock(String name, String compressionName) throws IOException { _bw = new BlockWrite(_bc.prepareMetaBlock(name, compressionName)); return _bw; } + @Override public ABlockWriter prepareDataBlock() throws IOException { _bw = new BlockWrite(_bc.prepareDataBlock()); return _bw; } + @Override public void close() throws IOException { _bw.close(); @@ -369,6 +373,7 @@ public class CachableBlockFile { * NOTE: In the case of multi-read threads: This method can do redundant work where an entry is read from disk and other threads check the cache before it * has been inserted. */ + @Override public BlockRead getMetaBlock(String blockName) throws IOException { String _lookup = this.fileName + "M" + blockName; return getBlock(_lookup, _iCache, new MetaBlockLoader(blockName, accumuloConfiguration)); @@ -388,6 +393,7 @@ public class CachableBlockFile { * has been inserted. */ + @Override public BlockRead getDataBlock(int blockIndex) throws IOException { String _lookup = this.fileName + "O" + blockIndex; return getBlock(_lookup, _dCache, new OffsetBlockLoader(blockIndex)); @@ -400,6 +406,7 @@ public class CachableBlockFile { return getBlock(_lookup, _dCache, new RawBlockLoader(offset, compressedSize, rawSize)); } + @Override public synchronized void close() throws IOException { if (closed) return; @@ -416,30 +423,6 @@ public class CachableBlockFile { } - static class SeekableByteArrayInputStream extends ByteArrayInputStream { - - public SeekableByteArrayInputStream(byte[] buf) { - super(buf); - } - - public SeekableByteArrayInputStream(byte buf[], int offset, int length) { - super(buf, offset, length); - throw new UnsupportedOperationException("Seek code assumes offset is zero"); // do not need this constructor, documenting that seek will not work - // unless offset it kept track of - } - - public void seek(int position) { - if (pos < 0 || pos >= buf.length) - throw new IllegalArgumentException("pos = " + pos + " buf.lenght = " + buf.length); - this.pos = position; - } - - public int getPosition() { - return this.pos; - } - - } - public static class CachedBlockRead extends BlockRead { private SeekableByteArrayInputStream seekableInput; private final CacheEntry cb; @@ -470,6 +453,11 @@ public class CachableBlockFile { } @Override + public byte[] getBuffer() { + return seekableInput.getBuffer(); + } + + @Override public T getIndex(Class clazz) { T bi = null; synchronized (cb) { @@ -510,6 +498,7 @@ public class CachableBlockFile { /** * Size is the size of the bytearray that was read form the cache */ + @Override public long getRawSize() { return size; } @@ -543,5 +532,13 @@ public class CachableBlockFile { throw new UnsupportedOperationException(); } + /** + * The byte array returned by this method is only for read optimizations, it should not be modified. + */ + @Override + public byte[] getBuffer() { + throw new UnsupportedOperationException(); + } + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/2afc3dc8/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java new file mode 100644 index 0000000..c6e7d29 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.file.blockfile.impl; + +import java.io.IOException; +import java.io.InputStream; + +import com.google.common.base.Preconditions; + +/** + * This class is like byte array input stream with two differences. It supports seeking and avoids synchronization. + */ +public class SeekableByteArrayInputStream extends InputStream { + + // making this volatile for the following case + // * thread 1 creates and initalizes byte array + // * thread 2 reads from bye array + // Findbugs complains about this because thread2 may not see any changes to the byte array after thread 1 set the voltile, + // however the expectation is that the byte array is static. In the case of it being static, volatile ensures that + // thread 2 sees all of thread 1 changes before setting the volatile. + private volatile byte buffer[]; + private int cur; + private int max; + + @Override + public int read() { + if (cur < max) { + return buffer[cur++] & 0xff; + } else { + return -1; + } + } + + @Override + public int read(byte b[], int offset, int length) { + if (b == null) { + throw new NullPointerException(); + } + + if (length < 0 || offset < 0 || length > b.length - offset) { + throw new IndexOutOfBoundsException(); + } + + if (length == 0) { + return 0; + } + + int avail = max - cur; + + if (avail <= 0) { + return -1; + } + + if (length > avail) { + length = avail; + } + + System.arraycopy(buffer, cur, b, offset, length); + cur += length; + return length; + } + + @Override + public long skip(long requestedSkip) { + long actualSkip = max - cur; + if (requestedSkip < actualSkip) + if (requestedSkip < 0) + actualSkip = 0; + else + actualSkip = requestedSkip; + + cur += actualSkip; + return actualSkip; + } + + @Override + public int available() { + return max - cur; + } + + @Override + public boolean markSupported() { + return false; + } + + @Override + public void mark(int readAheadLimit) { + throw new UnsupportedOperationException(); + } + + @Override + public void reset() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException {} + + public SeekableByteArrayInputStream(byte[] buf) { + Preconditions.checkNotNull(buf, "bug argument was null"); + this.buffer = buf; + this.cur = 0; + this.max = buf.length; + } + + public SeekableByteArrayInputStream(byte[] buf, int maxOffset) { + Preconditions.checkNotNull(buf, "bug argument was null"); + this.buffer = buf; + this.cur = 0; + this.max = maxOffset; + } + + public void seek(int position) { + if (position < 0 || position >= max) + throw new IllegalArgumentException("position = " + position + " maxOffset = " + max); + this.cur = position; + } + + public int getPosition() { + return this.cur; + } + + byte[] getBuffer() { + return buffer; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/2afc3dc8/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java index 2109478..75ad4c8 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java @@ -37,9 +37,12 @@ import org.apache.accumulo.core.file.blockfile.ABlockReader; import org.apache.accumulo.core.file.blockfile.ABlockWriter; import org.apache.accumulo.core.file.blockfile.BlockFileReader; import org.apache.accumulo.core.file.blockfile.BlockFileWriter; +import org.apache.accumulo.core.file.blockfile.impl.SeekableByteArrayInputStream; import org.apache.accumulo.core.file.rfile.bcfile.Utils; import org.apache.hadoop.io.WritableComparable; +import com.google.common.base.Preconditions; + public class MultiLevelIndex { public static class IndexEntry implements WritableComparable { @@ -129,85 +132,121 @@ public class MultiLevelIndex { } } - // a list that deserializes index entries on demand - private static class SerializedIndex extends AbstractList implements List, RandomAccess { + private static abstract class SerializedIndexBase extends AbstractList implements List, RandomAccess { + protected int[] offsets; + protected byte[] data; - private int[] offsets; - private byte[] data; - private boolean newFormat; + protected SeekableByteArrayInputStream sbais; + protected DataInputStream dis; + protected int offsetsOffset; + protected int indexOffset; + protected int numOffsets; + protected int indexSize; - SerializedIndex(int[] offsets, byte[] data, boolean newFormat) { + SerializedIndexBase(int[] offsets, byte[] data) { + Preconditions.checkNotNull(offsets, "offsets argument was null"); + Preconditions.checkNotNull(data, "data argument was null"); this.offsets = offsets; this.data = data; - this.newFormat = newFormat; + sbais = new SeekableByteArrayInputStream(data); + dis = new DataInputStream(sbais); } - @Override - public IndexEntry get(int index) { - int len; - if (index == offsets.length - 1) - len = data.length - offsets[index]; - else - len = offsets[index + 1] - offsets[index]; + SerializedIndexBase(byte[] data, int offsetsOffset, int numOffsets, int indexOffset, int indexSize) { + Preconditions.checkNotNull(data, "data argument was null"); + sbais = new SeekableByteArrayInputStream(data, indexOffset + indexSize); + dis = new DataInputStream(sbais); + this.offsetsOffset = offsetsOffset; + this.indexOffset = indexOffset; + this.numOffsets = numOffsets; + this.indexSize = indexSize; + } - ByteArrayInputStream bais = new ByteArrayInputStream(data, offsets[index], len); - DataInputStream dis = new DataInputStream(bais); + /** + * Before this method is called, {@code this.dis} is seeked to the offset of a serialized index entry. This method should deserialize the index entry by + * reading from {@code this.dis} and return it. + */ + protected abstract T newValue() throws IOException; - IndexEntry ie = new IndexEntry(newFormat); + @Override + public T get(int index) { try { - ie.readFields(dis); - } catch (IOException e) { - throw new RuntimeException(e); - } + int offset; + if (offsets == null) { + if (index < 0 || index >= numOffsets) { + throw new IndexOutOfBoundsException("index:" + index + " numOffsets:" + numOffsets); + } + sbais.seek(offsetsOffset + index * 4); + offset = dis.readInt(); + } else { + offset = offsets[index]; + } - return ie; + sbais.seek(indexOffset + offset); + return newValue(); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } } @Override public int size() { - return offsets.length; - } - - public long sizeInBytes() { - return data.length + 4 * offsets.length; + if (offsets == null) { + return numOffsets; + } else { + return offsets.length; + } } } - private static class KeyIndex extends AbstractList implements List, RandomAccess { + // a list that deserializes index entries on demand + private static class SerializedIndex extends SerializedIndexBase { - private int[] offsets; - private byte[] data; + private boolean newFormat; - KeyIndex(int[] offsets, byte[] data) { - this.offsets = offsets; - this.data = data; + SerializedIndex(int[] offsets, byte[] data, boolean newFormat) { + super(offsets, data); + this.newFormat = newFormat; + } + + SerializedIndex(byte[] data, int offsetsOffset, int numOffsets, int indexOffset, int indexSize) { + super(data, offsetsOffset, numOffsets, indexOffset, indexSize); + this.newFormat = true; + } + + public long sizeInBytes() { + if (offsets == null) { + return indexSize + 4 * numOffsets; + } else { + return data.length + 4 * offsets.length; + } } @Override - public Key get(int index) { - int len; - if (index == offsets.length - 1) - len = data.length - offsets[index]; - else - len = offsets[index + 1] - offsets[index]; + protected IndexEntry newValue() throws IOException { + IndexEntry ie = new IndexEntry(newFormat); + ie.readFields(dis); + return ie; + } + + } - ByteArrayInputStream bais = new ByteArrayInputStream(data, offsets[index], len); - DataInputStream dis = new DataInputStream(bais); + private static class KeyIndex extends SerializedIndexBase { - Key key = new Key(); - try { - key.readFields(dis); - } catch (IOException e) { - throw new RuntimeException(e); - } + KeyIndex(int[] offsets, byte[] data) { + super(offsets, data); + } - return key; + KeyIndex(byte[] data, int offsetsOffset, int numOffsets, int indexOffset, int indexSize) { + super(data, offsetsOffset, numOffsets, indexOffset, indexSize); } @Override - public int size() { - return offsets.length; + protected Key newValue() throws IOException { + Key key = new Key(); + key.readFields(dis); + return key; } } @@ -219,11 +258,16 @@ public class MultiLevelIndex { private ArrayList offsets; private int level; private int offset; - - SerializedIndex index; - KeyIndex keyIndex; private boolean hasNext; + private byte data[]; + private int[] offsetsArray; + private int numOffsets; + private int offsetsOffset; + private int indexSize; + private int indexOffset; + private boolean newFormat; + public IndexBlock(int level, int totalAdded) { // System.out.println("IndexBlock("+level+","+levelCount+","+totalAdded+")"); @@ -270,18 +314,35 @@ public class MultiLevelIndex { offset = in.readInt(); hasNext = in.readBoolean(); - int numOffsets = in.readInt(); - int[] offsets = new int[numOffsets]; - - for (int i = 0; i < numOffsets; i++) - offsets[i] = in.readInt(); + ABlockReader abr = (ABlockReader) in; + if (abr.isIndexable()) { + // this block is cahced, so avoid copy + data = abr.getBuffer(); + // use offset data in serialized form and avoid copy + numOffsets = abr.readInt(); + offsetsOffset = abr.getPosition(); + int skipped = abr.skipBytes(numOffsets * 4); + if (skipped != numOffsets * 4) { + throw new IOException("Skipped less than expected " + skipped + " " + (numOffsets * 4)); + } + indexSize = in.readInt(); + indexOffset = abr.getPosition(); + skipped = abr.skipBytes(indexSize); + if (skipped != indexSize) { + throw new IOException("Skipped less than expected " + skipped + " " + indexSize); + } + } else { + numOffsets = in.readInt(); + offsetsArray = new int[numOffsets]; - int indexSize = in.readInt(); - byte[] serializedIndex = new byte[indexSize]; - in.readFully(serializedIndex); + for (int i = 0; i < numOffsets; i++) + offsetsArray[i] = in.readInt(); - index = new SerializedIndex(offsets, serializedIndex, true); - keyIndex = new KeyIndex(offsets, serializedIndex); + indexSize = in.readInt(); + data = new byte[indexSize]; + in.readFully(data); + newFormat = true; + } } else if (version == RFile.RINDEX_VER_3) { level = 0; offset = 0; @@ -307,9 +368,9 @@ public class MultiLevelIndex { oia[i] = oal.get(i); } - byte[] serializedIndex = baos.toByteArray(); - index = new SerializedIndex(oia, serializedIndex, false); - keyIndex = new KeyIndex(oia, serializedIndex); + data = baos.toByteArray(); + offsetsArray = oia; + newFormat = false; } else if (version == RFile.RINDEX_VER_4) { level = 0; offset = 0; @@ -325,8 +386,9 @@ public class MultiLevelIndex { byte[] indexData = new byte[size]; in.readFully(indexData); - index = new SerializedIndex(offsets, indexData, false); - keyIndex = new KeyIndex(offsets, indexData); + data = indexData; + offsetsArray = offsets; + newFormat = false; } else { throw new RuntimeException("Unexpected version " + version); } @@ -334,11 +396,23 @@ public class MultiLevelIndex { } List getIndex() { - return index; + // create SerializedIndex on demand as each has an internal input stream over byte array... keeping a SerializedIndex ref for the object could lead to + // problems with deep copies. + if (offsetsArray == null) { + return new SerializedIndex(data, offsetsOffset, numOffsets, indexOffset, indexSize); + } else { + return new SerializedIndex(offsetsArray, data, newFormat); + } } public List getKeyIndex() { - return keyIndex; + // create KeyIndex on demand as each has an internal input stream over byte array... keeping a KeyIndex ref for the object could lead to problems with + // deep copies. + if (offsetsArray == null) { + return new KeyIndex(data, offsetsOffset, numOffsets, indexOffset, indexSize); + } else { + return new KeyIndex(offsetsArray, data); + } } int getLevel() { @@ -761,14 +835,15 @@ public class MultiLevelIndex { if (count == null) count = 0l; - size += ib.index.sizeInBytes(); + List index = ib.getIndex(); + size += ((SerializedIndex) index).sizeInBytes(); count++; sizesByLevel.put(ib.getLevel(), size); countsByLevel.put(ib.getLevel(), count); if (ib.getLevel() > 0) { - for (IndexEntry ie : ib.index) { + for (IndexEntry ie : index) { IndexBlock cib = getIndexBlock(ie); getIndexInfo(cib, sizesByLevel, countsByLevel); }