geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From upthewatersp...@apache.org
Subject [1/2] incubator-geode git commit: Fixing failure from RegionDirectoryJUnitTest.testCopyBytesWithThreads
Date Wed, 26 Aug 2015 20:43:18 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-11 803bebb6e -> 0c0f620b0


Fixing failure from RegionDirectoryJUnitTest.testCopyBytesWithThreads

The underlying issue was that the FileIndexInputClass extended
BufferedIndexInput, which looks like has an issue in the clone method.
In FileIndexInput.clone. We were cloning the input stream to start
at the position that it left off (which I think is correct), but the
BufferedIndexInput.clone dropped the buffer, so the next read would miss
some of the bytes that where in that buffer.

After some thought, I don't think we want the BufferedIndexInput because
the data is already stored in memory in a byte array in FileInputStream.
So I removed the dependency on BufferedIndexInput. As part of that work,
I added seek() to the FileInputStream to make seek operations more
efficient.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7e71512f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7e71512f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7e71512f

Branch: refs/heads/feature/GEODE-11
Commit: 7e71512f9d84361435d865ba067c5a6a1874d9a9
Parents: 803bebb
Author: Dan Smith <upthewaterspout@apache.org>
Authored: Wed Aug 26 09:02:34 2015 -0700
Committer: Dan Smith <upthewaterspout@apache.org>
Committed: Wed Aug 26 09:02:34 2015 -0700

----------------------------------------------------------------------
 .../cache/lucene/internal/FileIndexInput.java   | 112 +++++++++++++++++++
 .../cache/lucene/internal/RegionDirectory.java  |  41 ++-----
 .../cache/lucene/internal/filesystem/File.java  |   2 +-
 .../internal/filesystem/FileInputStream.java    |  47 +++++++-
 .../internal/filesystem/FileOutputStream.java   |   8 ++
 .../lucene/internal/filesystem/FileSystem.java  |   4 -
 .../filesystem/SeekableInputStream.java         |  24 ++++
 .../internal/RegionDirectoryJUnitTest.java      |   7 --
 .../filesystem/FileSystemJUnitTest.java         | 101 ++++++++++++++++-
 9 files changed, 298 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7e71512f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileIndexInput.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileIndexInput.java
b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileIndexInput.java
new file mode 100644
index 0000000..9ad20ec
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileIndexInput.java
@@ -0,0 +1,112 @@
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.lucene.store.IndexInput;
+
+import com.gemstone.gemfire.cache.lucene.internal.filesystem.File;
+import com.gemstone.gemfire.cache.lucene.internal.filesystem.SeekableInputStream;
+
+final class FileIndexInput extends IndexInput {
+  private final File file;
+  SeekableInputStream in;
+  private long position;
+  
+  //Used for slice operations
+  private long sliceOffset;
+  private long sliceLength;
+  
+  FileIndexInput(String resourceDesc, File file) {
+    this(resourceDesc, file, 0L, file.getLength());
+  }
+
+  /**
+   * Constructor for a slice.
+   */
+  private FileIndexInput(String resourceDesc, File file, long offset, long length) {
+    super(resourceDesc);
+    this.file = file;
+    in = file.getInputStream();
+    this.sliceOffset = offset;
+    this.sliceLength = length;
+  }
+
+  @Override
+  public long length() {
+    return sliceLength;
+  }
+
+  @Override
+  public void close() throws IOException {
+    in.close();
+  }
+  
+  @Override
+  public FileIndexInput clone() {
+    FileIndexInput clone = (FileIndexInput)super.clone();
+    clone.in = in.clone();
+    return clone;
+  }
+
+  @Override
+  public long getFilePointer() {
+    return position;
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    in.seek(pos + sliceOffset);
+    this.position = pos;
+  }
+
+  @Override
+  public IndexInput slice(String sliceDescription, long offset, long length)
+      throws IOException {
+    if(length > (this.sliceLength - offset)) {
+      throw new IllegalArgumentException("Slice length is to large. Asked for " + length
+ " file length is " + sliceLength + ": " + this.file.getName());
+    }
+    if(offset < 0 || offset >= this.sliceLength) {
+      throw new IllegalArgumentException("Slice offset is invalid: " + this.file.getName());
+    }
+    
+    FileIndexInput result = new FileIndexInput(sliceDescription, file, sliceOffset + offset,
length);
+    result.seek(0);
+    return result;
+  }
+
+  @Override
+  public byte readByte() throws IOException {
+    if(++position > sliceLength) {
+      throw new EOFException("Read past end of file " + file.getName());
+    }
+    
+    int result = in.read();
+    if(result == -1) {
+      throw new EOFException("Read past end of file " + file.getName());
+    } else {
+      return (byte) result;
+    }
+  }
+
+  @Override
+  public void readBytes(byte[] b, int offset, int len) throws IOException {
+    if(len == 0) {
+      return;
+    }
+    
+    if(position + len > sliceLength) {
+      throw new EOFException("Read past end of file " + file.getName());
+    }
+    
+    //For the FileSystemInputStream, it will always read all bytes, up
+    //until the end of the file. So if we didn't get enough bytes, it's
+    //because we reached the end of the file.
+    int numRead = in.read(b, offset, len);
+    if(numRead < len) {
+      throw new EOFException("Read past end of file " + file.getName());
+    }
+    
+    position+=len;
+  } 
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7e71512f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RegionDirectory.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RegionDirectory.java
b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RegionDirectory.java
index 903eb44..2e2dd14 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RegionDirectory.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RegionDirectory.java
@@ -1,7 +1,6 @@
 package com.gemstone.gemfire.cache.lucene.internal;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Collection;
 import java.util.concurrent.ConcurrentMap;
@@ -28,41 +27,15 @@ import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystem;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 
+/**
+ * An implementation of Directory that stores data in geode regions.
+ * 
+ * Directory is an interface to file/RAM storage for lucene. This class uses
+ * the {@link FileSystem} class to store the data in the provided geode
+ * regions.
+ */
 public class RegionDirectory extends BaseDirectory {
 
-  private static final class FileIndexInput extends BufferedIndexInput {
-    private final File file;
-    InputStream in;
-
-    private FileIndexInput(String resourceDesc, File file) {
-      super(resourceDesc);
-      this.file = file;
-      in = file.getInputStream();
-    }
-
-    @Override
-    public long length() {
-      return file.getLength();
-    }
-
-    @Override
-    public void close() throws IOException {
-      in.close();
-    }
-
-    @Override
-    protected void seekInternal(long pos) throws IOException {
-      in.close();
-      in = file.getInputStream();
-      in.skip(pos);
-    }
-
-    @Override
-    protected void readInternal(byte[] b, int offset, int length) throws IOException {
-      in.read(b, offset, length);
-    }
-  }
-
   static private final boolean CREATE_CACHE = Boolean.getBoolean("lucene.createCache");
   private static final Logger logger = LogService.getLogger();
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7e71512f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/File.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/File.java
b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/File.java
index 388e469..894ef4c 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/File.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/File.java
@@ -60,7 +60,7 @@ public class File implements Serializable {
    * 
    * The input stream is not threadsafe
    */
-  public InputStream getInputStream() {
+  public SeekableInputStream getInputStream() {
     // TODO get read lock?
     return new FileInputStream(this);
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7e71512f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileInputStream.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileInputStream.java
b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileInputStream.java
index 5304a55..bcb0821 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileInputStream.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileInputStream.java
@@ -1,5 +1,6 @@
 package com.gemstone.gemfire.cache.lucene.internal.filesystem;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -9,7 +10,7 @@ import java.io.InputStream;
  * will keep going back to the region to look for
  * chunks until nothing is found.
  */
-final class FileInputStream extends InputStream {
+final class FileInputStream extends SeekableInputStream {
 
   private final File file;
   private byte[] chunk = null;
@@ -22,6 +23,14 @@ final class FileInputStream extends InputStream {
     nextChunk();
   }
 
+  public FileInputStream(FileInputStream other) {
+    this.file = other.file;
+    this.chunk = other.chunk;
+    this.chunkId = other.chunkId;
+    this.chunkPosition = other.chunkPosition;
+    this.open = other.open;
+  }
+
   @Override
   public int read() throws IOException {
     assertOpen();
@@ -34,6 +43,37 @@ final class FileInputStream extends InputStream {
 
     return chunk[chunkPosition++] & 0xff;
   }
+  
+  @Override
+  public void seek(long position) throws IOException {
+    if(position > file.length) {
+      throw new EOFException();
+    }
+    int targetChunk = (int) (position / file.getChunkSize());
+    int targetPosition = (int) (position % file.getChunkSize());
+    
+    if(targetChunk != (this.chunkId - 1)) {
+      chunk = file.getFileSystem().getChunk(this.file, targetChunk);
+      chunkId = targetChunk + 1;
+      chunkPosition = targetPosition;
+    } else {
+      chunkPosition = targetPosition;
+    }
+  }
+  
+  
+
+  @Override
+  public long skip(long n) throws IOException {
+    int currentPosition = (chunkId - 1) * file.getChunkSize() + chunkPosition;
+    seek(currentPosition + n);
+    return n;
+  }
+  
+  @Override
+  public void reset() throws IOException {
+    seek(0);
+  }
 
   @Override
   public int read(byte[] b, int off, int len) throws IOException {
@@ -100,4 +140,9 @@ final class FileInputStream extends InputStream {
       throw new IOException("Closed");
     }
   }
+  
+  @Override
+  public FileInputStream clone() {
+    return new FileInputStream(this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7e71512f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileOutputStream.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileOutputStream.java
b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileOutputStream.java
index e7f84c3..ea80d78 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileOutputStream.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileOutputStream.java
@@ -18,6 +18,14 @@ final class FileOutputStream extends OutputStream {
     buffer = ByteBuffer.allocate(file.getChunkSize());
     this.length = file.length;
     this.chunks = file.chunks;
+    if(chunks > 0 && file.length % file.getChunkSize() != 0) {
+      //If the last chunk was incomplete, we're going to update it
+      //rather than add a new chunk. This guarantees that all chunks
+      //are full except for the last chunk.
+      chunks--;
+      byte[] previousChunkData = file.getFileSystem().getChunk(file, chunks);
+      buffer.put(previousChunkData);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7e71512f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java
b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java
index ef69322..2a7c22b 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java
@@ -125,8 +125,4 @@ public class FileSystem {
   void updateFile(File file) {
     fileRegion.put(file.getName(), file);
   }
-
-  
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7e71512f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/SeekableInputStream.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/SeekableInputStream.java
b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/SeekableInputStream.java
new file mode 100644
index 0000000..abf3268
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/SeekableInputStream.java
@@ -0,0 +1,24 @@
+package com.gemstone.gemfire.cache.lucene.internal.filesystem;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An input stream that supports seeking to a particular position.
+ */
+public abstract class SeekableInputStream extends InputStream {
+  
+  /**
+   * Seek to a position in the stream. The position is relative to the beginning
+   * of the stream (in other words, just before the first byte that was ever
+   * read).
+   * 
+   * @param position
+   * @throws IOException if the seek goes past the end of the stream
+   */
+  public abstract void seek(long position) throws IOException;
+  
+  public abstract SeekableInputStream clone();
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7e71512f/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/RegionDirectoryJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/RegionDirectoryJUnitTest.java
b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/RegionDirectoryJUnitTest.java
index 8908a4a..3ff825e 100644
--- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/RegionDirectoryJUnitTest.java
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/RegionDirectoryJUnitTest.java
@@ -34,11 +34,4 @@ public class RegionDirectoryJUnitTest extends BaseDirectoryTestCase {
   protected Directory getDirectory(Path path) throws IOException {
     return new RegionDirectory(new ConcurrentHashMap<String, File>(), new ConcurrentHashMap<ChunkKey,
byte[]>());
   }
-
-  @Override
-  public void testCopyBytesWithThreads() throws Exception {
-    //TODO - this method is currently failing
-  }
-  
-  
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7e71512f/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystemJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystemJUnitTest.java
b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystemJUnitTest.java
index f29236a..8f1b7dc 100644
--- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystemJUnitTest.java
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystemJUnitTest.java
@@ -78,7 +78,7 @@ public class FileSystemJUnitTest {
     
     assertEquals(3 + LARGE_CHUNK + SMALL_CHUNK, file1.getLength());
 
-    //Make sure we can read all fo the data back and it matches
+    //Make sure we can read all of the data back and it matches
     InputStream is = file1.getInputStream();
     
     assertEquals(2, is.read());
@@ -119,6 +119,105 @@ public class FileSystemJUnitTest {
   }
   
   /**
+   * A test of cloning a a FileInputStream. The
+   * clone should start from where the original was positioned,
+   * but they should not hurt each other.
+   */
+  @Test
+  public void testCloneReader() throws IOException {
+    File file1= system.createFile("testFile1");
+    
+    byte[] data = writeRandomBytes(file1);
+    
+    SeekableInputStream in = file1.getInputStream();
+    
+    //Read to partway through the file
+    byte[] results1 = new byte[data.length];
+    in.read(results1, 0, SMALL_CHUNK);
+    
+    
+    //Clone the input stream. Both copies should
+    //now be positioned partway through the file.
+    SeekableInputStream in2 = in.clone();
+    
+    byte[] results2 = new byte[data.length];
+    
+    //Fill in the beginning of results2 with the data that it missed
+    //to make testing easier.
+    System.arraycopy(data, 0, results2, 0, SMALL_CHUNK);
+    
+    
+    //Read the rest of the file with both copies
+    in2.read(results2, SMALL_CHUNK, data.length);
+    in.read(results1, SMALL_CHUNK, data.length);
+    
+    //Both readers should have started from the same place
+    //and copied the rest of the data from the file
+    assertArrayEquals(data,results1);
+    assertArrayEquals(data,results2);
+  }
+  
+  /**
+   * A test that skip can jump to the correct position in the stream
+   */
+  @Test
+  public void testSeek() throws IOException {
+    File file= system.createFile("testFile1");
+
+    ByteArrayOutputStream expected = new ByteArrayOutputStream();
+    byte[] data = new byte[SMALL_CHUNK];
+    
+    //Write multiple times to the file with a lot of small chunks
+    while(expected.size() < FileSystem.CHUNK_SIZE + 1) {
+      rand.nextBytes(data);
+
+      expected.write(data);
+      writeBytes(file, data);
+    }
+    
+    byte[] expectedBytes = expected.toByteArray();
+    assertContents(expectedBytes, file);
+    
+    //Assert that there are only 2 chunks in the system, since we wrote just
+    //past the end of the first chunk.
+    assertEquals(2, chunkRegion.size());
+    
+    SeekableInputStream in = file.getInputStream();
+    
+    //Seek to several positions in the first chunk
+    checkByte(5, in, expectedBytes);
+    checkByte(50, in, expectedBytes);
+    checkByte(103, in, expectedBytes);
+    checkByte(1, in, expectedBytes);
+    
+    //Seek back and forth between chunks
+    checkByte(FileSystem.CHUNK_SIZE + 2, in, expectedBytes);
+    checkByte(23, in, expectedBytes);
+    checkByte(FileSystem.CHUNK_SIZE + 10, in, expectedBytes);
+    checkByte(1023, in, expectedBytes);
+    
+    //Read the remaining data after a seek
+    
+    in.seek(10);
+    byte[] results = new byte[expectedBytes.length];
+    
+    //Fill in the initial 10 bytes with the expected value
+    System.arraycopy(expectedBytes, 0, results, 0, 10);
+    
+    assertEquals(results.length - 10, in.read(results, 10, results.length-10));
+    assertEquals(-1, in.read());
+    
+    assertArrayEquals(expectedBytes, results);
+  }
+  
+  private void checkByte(int i, SeekableInputStream in, byte[] expectedBytes) throws IOException
{
+    in.seek(i);
+    byte result = (byte) in.read();
+    
+    assertEquals(expectedBytes[i], result);
+  }
+
+  /**
    * Test basic file operations - rename, delete, listFiles.
    * @throws IOException
    */


Mime
View raw message