accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [1/3] accumulo git commit: ACCUMULO-4164 Avoid copying rfile index when in cache. Avoid sync when deserializing index.
Date Fri, 01 Apr 2016 17:40:04 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master 0dd1d6a51 -> 574234293


ACCUMULO-4164 Avoid copying rfile index when in cache.  Avoid sync when deserializing index.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2afc3dc8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2afc3dc8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2afc3dc8

Branch: refs/heads/master
Commit: 2afc3dc87d158667da72a8959726bce62de5dee6
Parents: 41e002d
Author: Keith Turner <kturner@apache.org>
Authored: Fri Apr 1 08:43:48 2016 -0400
Committer: Keith Turner <kturner@apache.org>
Committed: Fri Apr 1 12:56:40 2016 -0400

----------------------------------------------------------------------
 .../core/file/blockfile/ABlockReader.java       |   2 +
 .../file/blockfile/impl/CachableBlockFile.java  |  45 ++--
 .../impl/SeekableByteArrayInputStream.java      | 141 ++++++++++++
 .../core/file/rfile/MultiLevelIndex.java        | 219 +++++++++++++------
 4 files changed, 311 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/2afc3dc8/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
index 8df2469..9d7a01a 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
@@ -51,4 +51,6 @@ public interface ABlockReader extends DataInput {
   int getPosition();
 
   <T> T getIndex(Class<T> clazz);
+
+  byte[] getBuffer();
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2afc3dc8/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index 4d65c9f..7496202 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -70,21 +70,25 @@ public class CachableBlockFile {
       _bc = new BCFile.Writer(fsout, compressAlgor, conf, false, accumuloConfiguration);
     }
 
+    @Override
     public ABlockWriter prepareMetaBlock(String name) throws IOException {
       _bw = new BlockWrite(_bc.prepareMetaBlock(name));
       return _bw;
     }
 
+    @Override
     public ABlockWriter prepareMetaBlock(String name, String compressionName) throws IOException
{
       _bw = new BlockWrite(_bc.prepareMetaBlock(name, compressionName));
       return _bw;
     }
 
+    @Override
     public ABlockWriter prepareDataBlock() throws IOException {
       _bw = new BlockWrite(_bc.prepareDataBlock());
       return _bw;
     }
 
+    @Override
     public void close() throws IOException {
 
       _bw.close();
@@ -369,6 +373,7 @@ public class CachableBlockFile {
      * NOTE: In the case of multi-read threads: This method can do redundant work where an
entry is read from disk and other threads check the cache before it
      * has been inserted.
      */
+    @Override
     public BlockRead getMetaBlock(String blockName) throws IOException {
       String _lookup = this.fileName + "M" + blockName;
       return getBlock(_lookup, _iCache, new MetaBlockLoader(blockName, accumuloConfiguration));
@@ -388,6 +393,7 @@ public class CachableBlockFile {
      * has been inserted.
      */
 
+    @Override
     public BlockRead getDataBlock(int blockIndex) throws IOException {
       String _lookup = this.fileName + "O" + blockIndex;
       return getBlock(_lookup, _dCache, new OffsetBlockLoader(blockIndex));
@@ -400,6 +406,7 @@ public class CachableBlockFile {
       return getBlock(_lookup, _dCache, new RawBlockLoader(offset, compressedSize, rawSize));
     }
 
+    @Override
     public synchronized void close() throws IOException {
       if (closed)
         return;
@@ -416,30 +423,6 @@ public class CachableBlockFile {
 
   }
 
-  static class SeekableByteArrayInputStream extends ByteArrayInputStream {
-
-    public SeekableByteArrayInputStream(byte[] buf) {
-      super(buf);
-    }
-
-    public SeekableByteArrayInputStream(byte buf[], int offset, int length) {
-      super(buf, offset, length);
-      throw new UnsupportedOperationException("Seek code assumes offset is zero"); // do
not need this constructor, documenting that seek will not work
-                                                                                   // unless
offset it kept track of
-    }
-
-    public void seek(int position) {
-      if (pos < 0 || pos >= buf.length)
-        throw new IllegalArgumentException("pos = " + pos + " buf.lenght = " + buf.length);
-      this.pos = position;
-    }
-
-    public int getPosition() {
-      return this.pos;
-    }
-
-  }
-
   public static class CachedBlockRead extends BlockRead {
     private SeekableByteArrayInputStream seekableInput;
     private final CacheEntry cb;
@@ -470,6 +453,11 @@ public class CachableBlockFile {
     }
 
     @Override
+    public byte[] getBuffer() {
+      return seekableInput.getBuffer();
+    }
+
+    @Override
     public <T> T getIndex(Class<T> clazz) {
       T bi = null;
       synchronized (cb) {
@@ -510,6 +498,7 @@ public class CachableBlockFile {
     /**
      * Size is the size of the bytearray that was read form the cache
      */
+    @Override
     public long getRawSize() {
       return size;
     }
@@ -543,5 +532,13 @@ public class CachableBlockFile {
       throw new UnsupportedOperationException();
     }
 
+    /**
+     * The byte array returned by this method is only for read optimizations, it should not
be modified.
+     */
+    @Override
+    public byte[] getBuffer() {
+      throw new UnsupportedOperationException();
+    }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2afc3dc8/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java
new file mode 100644
index 0000000..c6e7d29
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.file.blockfile.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class is like byte array input stream with two differences. It supports seeking and
avoids synchronization.
+ */
+public class SeekableByteArrayInputStream extends InputStream {
+
+  // making this volatile for the following case
+  // * thread 1 creates and initalizes byte array
+  // * thread 2 reads from bye array
+  // Findbugs complains about this because thread2 may not see any changes to the byte array
after thread 1 set the voltile,
+  // however the expectation is that the byte array is static. In the case of it being static,
volatile ensures that
+  // thread 2 sees all of thread 1 changes before setting the volatile.
+  private volatile byte buffer[];
+  private int cur;
+  private int max;
+
+  @Override
+  public int read() {
+    if (cur < max) {
+      return buffer[cur++] & 0xff;
+    } else {
+      return -1;
+    }
+  }
+
+  @Override
+  public int read(byte b[], int offset, int length) {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+
+    if (length < 0 || offset < 0 || length > b.length - offset) {
+      throw new IndexOutOfBoundsException();
+    }
+
+    if (length == 0) {
+      return 0;
+    }
+
+    int avail = max - cur;
+
+    if (avail <= 0) {
+      return -1;
+    }
+
+    if (length > avail) {
+      length = avail;
+    }
+
+    System.arraycopy(buffer, cur, b, offset, length);
+    cur += length;
+    return length;
+  }
+
+  @Override
+  public long skip(long requestedSkip) {
+    long actualSkip = max - cur;
+    if (requestedSkip < actualSkip)
+      if (requestedSkip < 0)
+        actualSkip = 0;
+      else
+        actualSkip = requestedSkip;
+
+    cur += actualSkip;
+    return actualSkip;
+  }
+
+  @Override
+  public int available() {
+    return max - cur;
+  }
+
+  @Override
+  public boolean markSupported() {
+    return false;
+  }
+
+  @Override
+  public void mark(int readAheadLimit) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void reset() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void close() throws IOException {}
+
+  public SeekableByteArrayInputStream(byte[] buf) {
+    Preconditions.checkNotNull(buf, "bug argument was null");
+    this.buffer = buf;
+    this.cur = 0;
+    this.max = buf.length;
+  }
+
+  public SeekableByteArrayInputStream(byte[] buf, int maxOffset) {
+    Preconditions.checkNotNull(buf, "bug argument was null");
+    this.buffer = buf;
+    this.cur = 0;
+    this.max = maxOffset;
+  }
+
+  public void seek(int position) {
+    if (position < 0 || position >= max)
+      throw new IllegalArgumentException("position = " + position + " maxOffset = " + max);
+    this.cur = position;
+  }
+
+  public int getPosition() {
+    return this.cur;
+  }
+
+  byte[] getBuffer() {
+    return buffer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2afc3dc8/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
index 2109478..75ad4c8 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
@@ -37,9 +37,12 @@ import org.apache.accumulo.core.file.blockfile.ABlockReader;
 import org.apache.accumulo.core.file.blockfile.ABlockWriter;
 import org.apache.accumulo.core.file.blockfile.BlockFileReader;
 import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
+import org.apache.accumulo.core.file.blockfile.impl.SeekableByteArrayInputStream;
 import org.apache.accumulo.core.file.rfile.bcfile.Utils;
 import org.apache.hadoop.io.WritableComparable;
 
+import com.google.common.base.Preconditions;
+
 public class MultiLevelIndex {
 
   public static class IndexEntry implements WritableComparable<IndexEntry> {
@@ -129,85 +132,121 @@ public class MultiLevelIndex {
     }
   }
 
-  // a list that deserializes index entries on demand
-  private static class SerializedIndex extends AbstractList<IndexEntry> implements
List<IndexEntry>, RandomAccess {
+  private static abstract class SerializedIndexBase<T> extends AbstractList<T>
implements List<T>, RandomAccess {
+    protected int[] offsets;
+    protected byte[] data;
 
-    private int[] offsets;
-    private byte[] data;
-    private boolean newFormat;
+    protected SeekableByteArrayInputStream sbais;
+    protected DataInputStream dis;
+    protected int offsetsOffset;
+    protected int indexOffset;
+    protected int numOffsets;
+    protected int indexSize;
 
-    SerializedIndex(int[] offsets, byte[] data, boolean newFormat) {
+    SerializedIndexBase(int[] offsets, byte[] data) {
+      Preconditions.checkNotNull(offsets, "offsets argument was null");
+      Preconditions.checkNotNull(data, "data argument was null");
       this.offsets = offsets;
       this.data = data;
-      this.newFormat = newFormat;
+      sbais = new SeekableByteArrayInputStream(data);
+      dis = new DataInputStream(sbais);
     }
 
-    @Override
-    public IndexEntry get(int index) {
-      int len;
-      if (index == offsets.length - 1)
-        len = data.length - offsets[index];
-      else
-        len = offsets[index + 1] - offsets[index];
+    SerializedIndexBase(byte[] data, int offsetsOffset, int numOffsets, int indexOffset,
int indexSize) {
+      Preconditions.checkNotNull(data, "data argument was null");
+      sbais = new SeekableByteArrayInputStream(data, indexOffset + indexSize);
+      dis = new DataInputStream(sbais);
+      this.offsetsOffset = offsetsOffset;
+      this.indexOffset = indexOffset;
+      this.numOffsets = numOffsets;
+      this.indexSize = indexSize;
+    }
 
-      ByteArrayInputStream bais = new ByteArrayInputStream(data, offsets[index], len);
-      DataInputStream dis = new DataInputStream(bais);
+    /**
+     * Before this method is called, {@code this.dis} is seeked to the offset of a serialized
index entry. This method should deserialize the index entry by
+     * reading from {@code this.dis} and return it.
+     */
+    protected abstract T newValue() throws IOException;
 
-      IndexEntry ie = new IndexEntry(newFormat);
+    @Override
+    public T get(int index) {
       try {
-        ie.readFields(dis);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
+        int offset;
+        if (offsets == null) {
+          if (index < 0 || index >= numOffsets) {
+            throw new IndexOutOfBoundsException("index:" + index + " numOffsets:" + numOffsets);
+          }
+          sbais.seek(offsetsOffset + index * 4);
+          offset = dis.readInt();
+        } else {
+          offset = offsets[index];
+        }
 
-      return ie;
+        sbais.seek(indexOffset + offset);
+        return newValue();
+      } catch (IOException ioe) {
+        throw new RuntimeException(ioe);
+      }
     }
 
     @Override
     public int size() {
-      return offsets.length;
-    }
-
-    public long sizeInBytes() {
-      return data.length + 4 * offsets.length;
+      if (offsets == null) {
+        return numOffsets;
+      } else {
+        return offsets.length;
+      }
     }
 
   }
 
-  private static class KeyIndex extends AbstractList<Key> implements List<Key>,
RandomAccess {
+  // a list that deserializes index entries on demand
+  private static class SerializedIndex extends SerializedIndexBase<IndexEntry> {
 
-    private int[] offsets;
-    private byte[] data;
+    private boolean newFormat;
 
-    KeyIndex(int[] offsets, byte[] data) {
-      this.offsets = offsets;
-      this.data = data;
+    SerializedIndex(int[] offsets, byte[] data, boolean newFormat) {
+      super(offsets, data);
+      this.newFormat = newFormat;
+    }
+
+    SerializedIndex(byte[] data, int offsetsOffset, int numOffsets, int indexOffset, int
indexSize) {
+      super(data, offsetsOffset, numOffsets, indexOffset, indexSize);
+      this.newFormat = true;
+    }
+
+    public long sizeInBytes() {
+      if (offsets == null) {
+        return indexSize + 4 * numOffsets;
+      } else {
+        return data.length + 4 * offsets.length;
+      }
     }
 
     @Override
-    public Key get(int index) {
-      int len;
-      if (index == offsets.length - 1)
-        len = data.length - offsets[index];
-      else
-        len = offsets[index + 1] - offsets[index];
+    protected IndexEntry newValue() throws IOException {
+      IndexEntry ie = new IndexEntry(newFormat);
+      ie.readFields(dis);
+      return ie;
+    }
+
+  }
 
-      ByteArrayInputStream bais = new ByteArrayInputStream(data, offsets[index], len);
-      DataInputStream dis = new DataInputStream(bais);
+  private static class KeyIndex extends SerializedIndexBase<Key> {
 
-      Key key = new Key();
-      try {
-        key.readFields(dis);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
+    KeyIndex(int[] offsets, byte[] data) {
+      super(offsets, data);
+    }
 
-      return key;
+    KeyIndex(byte[] data, int offsetsOffset, int numOffsets, int indexOffset, int indexSize)
{
+      super(data, offsetsOffset, numOffsets, indexOffset, indexSize);
     }
 
     @Override
-    public int size() {
-      return offsets.length;
+    protected Key newValue() throws IOException {
+      Key key = new Key();
+      key.readFields(dis);
+      return key;
     }
   }
 
@@ -219,11 +258,16 @@ public class MultiLevelIndex {
     private ArrayList<Integer> offsets;
     private int level;
     private int offset;
-
-    SerializedIndex index;
-    KeyIndex keyIndex;
     private boolean hasNext;
 
+    private byte data[];
+    private int[] offsetsArray;
+    private int numOffsets;
+    private int offsetsOffset;
+    private int indexSize;
+    private int indexOffset;
+    private boolean newFormat;
+
     public IndexBlock(int level, int totalAdded) {
       // System.out.println("IndexBlock("+level+","+levelCount+","+totalAdded+")");
 
@@ -270,18 +314,35 @@ public class MultiLevelIndex {
         offset = in.readInt();
         hasNext = in.readBoolean();
 
-        int numOffsets = in.readInt();
-        int[] offsets = new int[numOffsets];
-
-        for (int i = 0; i < numOffsets; i++)
-          offsets[i] = in.readInt();
+        ABlockReader abr = (ABlockReader) in;
+        if (abr.isIndexable()) {
+          // this block is cahced, so avoid copy
+          data = abr.getBuffer();
+          // use offset data in serialized form and avoid copy
+          numOffsets = abr.readInt();
+          offsetsOffset = abr.getPosition();
+          int skipped = abr.skipBytes(numOffsets * 4);
+          if (skipped != numOffsets * 4) {
+            throw new IOException("Skipped less than expected " + skipped + " " + (numOffsets
* 4));
+          }
+          indexSize = in.readInt();
+          indexOffset = abr.getPosition();
+          skipped = abr.skipBytes(indexSize);
+          if (skipped != indexSize) {
+            throw new IOException("Skipped less than expected " + skipped + " " + indexSize);
+          }
+        } else {
+          numOffsets = in.readInt();
+          offsetsArray = new int[numOffsets];
 
-        int indexSize = in.readInt();
-        byte[] serializedIndex = new byte[indexSize];
-        in.readFully(serializedIndex);
+          for (int i = 0; i < numOffsets; i++)
+            offsetsArray[i] = in.readInt();
 
-        index = new SerializedIndex(offsets, serializedIndex, true);
-        keyIndex = new KeyIndex(offsets, serializedIndex);
+          indexSize = in.readInt();
+          data = new byte[indexSize];
+          in.readFully(data);
+          newFormat = true;
+        }
       } else if (version == RFile.RINDEX_VER_3) {
         level = 0;
         offset = 0;
@@ -307,9 +368,9 @@ public class MultiLevelIndex {
           oia[i] = oal.get(i);
         }
 
-        byte[] serializedIndex = baos.toByteArray();
-        index = new SerializedIndex(oia, serializedIndex, false);
-        keyIndex = new KeyIndex(oia, serializedIndex);
+        data = baos.toByteArray();
+        offsetsArray = oia;
+        newFormat = false;
       } else if (version == RFile.RINDEX_VER_4) {
         level = 0;
         offset = 0;
@@ -325,8 +386,9 @@ public class MultiLevelIndex {
         byte[] indexData = new byte[size];
         in.readFully(indexData);
 
-        index = new SerializedIndex(offsets, indexData, false);
-        keyIndex = new KeyIndex(offsets, indexData);
+        data = indexData;
+        offsetsArray = offsets;
+        newFormat = false;
       } else {
         throw new RuntimeException("Unexpected version " + version);
       }
@@ -334,11 +396,23 @@ public class MultiLevelIndex {
     }
 
     List<IndexEntry> getIndex() {
-      return index;
+      // create SerializedIndex on demand as each has an internal input stream over byte
array... keeping a SerializedIndex ref for the object could lead to
+      // problems with deep copies.
+      if (offsetsArray == null) {
+        return new SerializedIndex(data, offsetsOffset, numOffsets, indexOffset, indexSize);
+      } else {
+        return new SerializedIndex(offsetsArray, data, newFormat);
+      }
     }
 
     public List<Key> getKeyIndex() {
-      return keyIndex;
+      // create KeyIndex on demand as each has an internal input stream over byte array...
keeping a KeyIndex ref for the object could lead to problems with
+      // deep copies.
+      if (offsetsArray == null) {
+        return new KeyIndex(data, offsetsOffset, numOffsets, indexOffset, indexSize);
+      } else {
+        return new KeyIndex(offsetsArray, data);
+      }
     }
 
     int getLevel() {
@@ -761,14 +835,15 @@ public class MultiLevelIndex {
       if (count == null)
         count = 0l;
 
-      size += ib.index.sizeInBytes();
+      List<IndexEntry> index = ib.getIndex();
+      size += ((SerializedIndex) index).sizeInBytes();
       count++;
 
       sizesByLevel.put(ib.getLevel(), size);
       countsByLevel.put(ib.getLevel(), count);
 
       if (ib.getLevel() > 0) {
-        for (IndexEntry ie : ib.index) {
+        for (IndexEntry ie : index) {
           IndexBlock cib = getIndexBlock(ie);
           getIndexInfo(cib, sizesByLevel, countsByLevel);
         }


Mime
View raw message