arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject arrow git commit: ARROW-1613: [Java] Alternative ArrowReader close to free resources but leave ReadChannel open
Date Mon, 16 Oct 2017 19:54:04 GMT
Repository: arrow
Updated Branches:
  refs/heads/master 2f2a0c139 -> 1926bdc97


ARROW-1613: [Java] Alternative ArrowReader close to free resources but leave ReadChannel open

This adds an alternative `ArrowReader.close(boolean closeReadChannel)` that if called with
`false` will close reader resources such as vectors/dictionaries but leave the `ReadChannel`
open. The behavior of the default `ArrowReader.close()` is unchanged.

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #1138 from BryanCutler/java-ArrowRead-not-close-input-ARROW-1613 and squashes the following
commits:

028f2cd5 [Bryan Cutler] Added docs to ArrowReader
a9125dda [Bryan Cutler] revert test that manually closed ReadChannel
103a4192 [Bryan Cutler] changed to alternate close
b735135f [Bryan Cutler] Removed closing of ReadChannel from ArrowReader.close() and updated
usage in tests to close things correctly


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/1926bdc9
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/1926bdc9
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/1926bdc9

Branch: refs/heads/master
Commit: 1926bdc9786af8f72111b5b6520388f362bd8809
Parents: 2f2a0c1
Author: Bryan Cutler <cutlerb@gmail.com>
Authored: Mon Oct 16 15:53:58 2017 -0400
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Mon Oct 16 15:53:58 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/arrow/tools/EchoServer.java |  4 +-
 .../org/apache/arrow/tools/FileRoundtrip.java   |  5 +-
 .../org/apache/arrow/tools/EchoServerTest.java  |  4 +-
 .../apache/arrow/vector/file/ArrowReader.java   | 51 ++++++++++++++++++--
 .../vector/file/TestArrowReaderWriter.java      |  4 +-
 .../arrow/vector/file/TestArrowStreamPipe.java  |  1 +
 6 files changed, 56 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/1926bdc9/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
index c53f0ea..3091bc4 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
@@ -95,9 +95,9 @@ public class EchoServer {
     }
 
     public void run() throws IOException {
-      BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
       // Read the entire input stream and write it back
-      try (ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator))
{
+      try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+           ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator))
{
         VectorSchemaRoot root = reader.getVectorSchemaRoot();
         // load the first batch before instantiating the writer so that we have any dictionaries
         reader.loadNextBatch();

http://git-wip-us.apache.org/repos/asf/arrow/blob/1926bdc9/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
index 7d71b0b..ab8fa6e 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
@@ -79,8 +79,9 @@ public class FileRoundtrip {
 
       File inFile = validateFile("input", inFileName);
       File outFile = validateFile("output", outFileName);
-      BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); // TODO: close
-      try (FileInputStream fileInputStream = new FileInputStream(inFile);
+
+      try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+           FileInputStream fileInputStream = new FileInputStream(inFile);
            ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(),
                allocator)) {
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/1926bdc9/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
----------------------------------------------------------------------
diff --git a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
index 467965a..ecac6d6 100644
--- a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
+++ b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
@@ -95,9 +95,9 @@ public class EchoServerTest {
                               NullableTinyIntVector vector,
                               int batches)
       throws UnknownHostException, IOException {
-    BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
     VectorSchemaRoot root = new VectorSchemaRoot(asList(field), asList((FieldVector) vector),
0);
-    try (Socket socket = new Socket("localhost", serverPort);
+    try (BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
+         Socket socket = new Socket("localhost", serverPort);
          ArrowStreamWriter writer = new ArrowStreamWriter(root, null, socket.getOutputStream());
          ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), alloc))
{
       writer.start();

http://git-wip-us.apache.org/repos/asf/arrow/blob/1926bdc9/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
index 646d6fe..21fb220 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
@@ -41,6 +41,11 @@ import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.arrow.vector.util.DictionaryUtility;
 
+/**
+ * Abstract class to read ArrowRecordBatches from a ReadChannel.
+ *
+ * @param <T> Type of ReadChannel to use
+ */
 public abstract class ArrowReader<T extends ReadChannel> implements DictionaryProvider,
AutoCloseable {
 
   private final T in;
@@ -58,7 +63,7 @@ public abstract class ArrowReader<T extends ReadChannel> implements
DictionaryPr
   }
 
   /**
-   * Returns the vector schema root. This will be loaded with new values on every call to
loadNextBatch
+   * Returns the vector schema root. This will be loaded with new values on every call to
loadNextBatch.
    *
    * @return the vector schema root
    * @throws IOException if reading of schema fails
@@ -69,9 +74,9 @@ public abstract class ArrowReader<T extends ReadChannel> implements
DictionaryPr
   }
 
   /**
-   * Returns any dictionaries
+   * Returns any dictionaries that were loaded along with ArrowRecordBatches.
    *
-   * @return dictionaries, if any
+   * @return Map of dictionaries to dictionary id, empty if no dictionaries loaded
    * @throws IOException if reading of schema fails
    */
   public Map<Long, Dictionary> getDictionaryVectors() throws IOException {
@@ -79,6 +84,12 @@ public abstract class ArrowReader<T extends ReadChannel> implements
DictionaryPr
     return dictionaries;
   }
 
+  /**
+   * Lookup a dictionary that has been loaded using the dictionary id.
+   *
+   * @param id Unique identifier for a dictionary
+   * @return the requested dictionary or null if not found
+   */
   @Override
   public Dictionary lookup(long id) {
     if (!initialized) {
@@ -88,7 +99,12 @@ public abstract class ArrowReader<T extends ReadChannel> implements
DictionaryPr
     return dictionaries.get(id);
   }
 
-  // Returns true if a batch was read, false on EOS
+  /**
+   * Load the next ArrowRecordBatch to the vector schema root if available.
+   *
+   * @return true if a batch was read, false on EOS
+   * @throws IOException
+   */
   public boolean loadNextBatch() throws IOException {
     ensureInitialized();
     // read in all dictionary batches, then stop after our first record batch
@@ -129,19 +145,44 @@ public abstract class ArrowReader<T extends ReadChannel> implements
DictionaryPr
     return readBatch;
   }
 
+  /**
+   * Return the number of bytes read from the ReadChannel.
+   *
+   * @return number of bytes read
+   */
   public long bytesRead() {
     return in.bytesRead();
   }
 
+  /**
+   * Close resources, including vector schema root and dictionary vectors, and the
+   * underlying ReadChannel.
+   *
+   * @throws IOException
+   */
   @Override
   public void close() throws IOException {
+    close(true);
+  }
+
+  /**
+   * Close resources, including vector schema root and dictionary vectors. If the flag
+   * closeReadChannel is true then close the underlying ReadChannel, otherwise leave it open.
+   *
+   * @param closeReadChannel Flag to control if closing the underlying ReadChannel
+   * @throws IOException
+   */
+  public void close(boolean closeReadChannel) throws IOException {
     if (initialized) {
       root.close();
       for (Dictionary dictionary : dictionaries.values()) {
         dictionary.getVector().close();
       }
     }
-    in.close();
+
+    if (closeReadChannel) {
+      in.close();
+    }
   }
 
   protected abstract Schema readSchema(T in) throws IOException;

http://git-wip-us.apache.org/repos/asf/arrow/blob/1926bdc9/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
index 65332aa..3ce01a2 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
@@ -92,8 +92,8 @@ public class TestArrowReaderWriter {
 
     byte[] byteArray = out.toByteArray();
 
-    SeekableReadChannel channel = new SeekableReadChannel(new ByteArrayReadableSeekableByteChannel(byteArray));
-    try (ArrowFileReader reader = new ArrowFileReader(channel, allocator)) {
+    try (SeekableReadChannel channel = new SeekableReadChannel(new ByteArrayReadableSeekableByteChannel(byteArray));
+         ArrowFileReader reader = new ArrowFileReader(channel, allocator)) {
       Schema readSchema = reader.getVectorSchemaRoot().getSchema();
       assertEquals(schema, readSchema);
       assertTrue(readSchema.getFields().get(0).getTypeLayout().getVectorTypes().toString(),
readSchema.getFields().get(0).getTypeLayout().getVectors().size() > 0);

http://git-wip-us.apache.org/repos/asf/arrow/blob/1926bdc9/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java
b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java
index a19c379..4071694 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java
@@ -141,6 +141,7 @@ public class TestArrowStreamPipe {
         while (!done) {
           assertTrue(reader.loadNextBatch());
         }
+        reader.close();
       } catch (IOException e) {
         e.printStackTrace();
         Assert.fail(e.toString()); // have to explicitly fail since we're in a separate thread


Mime
View raw message