accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1329425 - in /accumulo/trunk/core/src: main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java main/java/org/apache/accumulo/core/file/rfile/RFile.java test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
Date Mon, 23 Apr 2012 20:19:38 GMT
Author: kturner
Date: Mon Apr 23 20:19:37 2012
New Revision: 1329425

URL: http://svn.apache.org/viewvc?rev=1329425&view=rev
Log:
ACCUMULO-550 Added code to buffer rfile index entries so that multiple index blocks are written
contiguously

Modified:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
    accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java?rev=1329425&r1=1329424&r2=1329425&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
Mon Apr 23 20:19:37 2012
@@ -346,6 +346,60 @@ public class MultiLevelIndex {
     
   }
   
+  /**
+   * this class buffers writes to the index so that chunks of index blocks are contiguous
in the file instead of having index blocks sprinkled throughout the
+   * file making scans of the entire index slow.
+   */
+  public static class BufferedWriter {
+    
+    private Writer writer;
+    private DataOutputStream buffer;
+    private int buffered;
+    private ByteArrayOutputStream baos;
+    
+    public BufferedWriter(Writer writer) {
+      this.writer = writer;
+      baos = new ByteArrayOutputStream(1 << 20);
+      buffer = new DataOutputStream(baos);
+      buffered = 0;
+    }
+    
+    private void flush() throws IOException {
+      buffer.close();
+      
+      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
+      
+      IndexEntry ie = new IndexEntry(true);
+      for (int i = 0; i < buffered; i++) {
+        ie.readFields(dis);
+        writer.add(ie.getKey(), ie.getNumEntries(), ie.getOffset(), ie.getCompressedSize(),
ie.getRawSize());
+      }
+      
+      buffered = 0;
+      baos = new ByteArrayOutputStream(1 << 20);
+      buffer = new DataOutputStream(baos);
+      
+    }
+    
+    public void add(Key key, int data, long offset, long compressedSize, long rawSize) throws
IOException {
+      if (buffer.size() > (10 * 1 << 20)) {
+        flush();
+      }
+      
+      new IndexEntry(key, data, offset, compressedSize, rawSize).write(buffer);
+      buffered++;
+    }
+    
+    public void addLast(Key key, int data, long offset, long compressedSize, long rawSize)
throws IOException {
+      flush();
+      writer.addLast(key, data, offset, compressedSize, rawSize);
+    }
+    
+    public void close(DataOutput out) throws IOException {
+      writer.close(out);
+    }
+  }
+
   public static class Writer {
     private int threshold;
     

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java?rev=1329425&r1=1329424&r2=1329425&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java Mon Apr
23 20:19:37 2012
@@ -103,7 +103,7 @@ public class RFile {
     private String name;
     private Set<ByteSequence> previousColumnFamilies;
     
-    private MultiLevelIndex.Writer indexWriter;
+    private MultiLevelIndex.BufferedWriter indexWriter;
     private MultiLevelIndex.Reader indexReader;
     
     public LocalityGroupMetadata(int version, BlockFileReader br) {
@@ -117,7 +117,7 @@ public class RFile {
       columnFamilies = new HashMap<ByteSequence,Count>();
       previousColumnFamilies = pcf;
       
-      indexWriter = new MultiLevelIndex.Writer(bfw, indexBlockSize);
+      indexWriter = new MultiLevelIndex.BufferedWriter(new MultiLevelIndex.Writer(bfw, indexBlockSize));
     }
     
     public LocalityGroupMetadata(String name, Set<ByteSequence> cfset, int nextBlock,
int indexBlockSize, BlockFileWriter bfw) {
@@ -129,7 +129,7 @@ public class RFile {
         columnFamilies.put(cf, new Count(0));
       }
       
-      indexWriter = new MultiLevelIndex.Writer(bfw, indexBlockSize);
+      indexWriter = new MultiLevelIndex.BufferedWriter(new MultiLevelIndex.Writer(bfw, indexBlockSize));
     }
     
     private Key getFirstKey() {

Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java?rev=1329425&r1=1329424&r2=1329425&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
(original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
Mon Apr 23 20:19:37 2012
@@ -26,11 +26,11 @@ import org.apache.accumulo.core.data.Key
 import org.apache.accumulo.core.file.blockfile.ABlockWriter;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.BlockRead;
-import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.rfile.MultiLevelIndex.BufferedWriter;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader;
-import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Writer;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
+import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Writer;
 import org.apache.accumulo.core.file.rfile.RFileTest.SeekableByteArrayInputStream;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -55,7 +55,7 @@ public class MultiLevelIndexTest extends
     FSDataOutputStream dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
     CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(dos, "gz", CachedConfiguration.getInstance());
     
-    Writer mliw = new Writer(_cbw, maxBlockSize);
+    BufferedWriter mliw = new BufferedWriter(new Writer(_cbw, maxBlockSize));
     
     for (int i = 0; i < num; i++)
       mliw.add(new Key(String.format("%05d000", i)), i, 0, 0, 0);



Mime
View raw message