Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3BFF4200D21 for ; Mon, 16 Oct 2017 21:54:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3A9DB1609EF; Mon, 16 Oct 2017 19:54:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 577321609E3 for ; Mon, 16 Oct 2017 21:54:05 +0200 (CEST) Received: (qmail 18567 invoked by uid 500); 16 Oct 2017 19:54:04 -0000 Mailing-List: contact commits-help@arrow.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@arrow.apache.org Delivered-To: mailing list commits@arrow.apache.org Received: (qmail 18557 invoked by uid 99); 16 Oct 2017 19:54:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Oct 2017 19:54:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6E764DFAEB; Mon, 16 Oct 2017 19:54:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wesm@apache.org To: commits@arrow.apache.org Message-Id: <54cfb3036c4548ec81d4325d997434ae@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: arrow git commit: ARROW-1613: [Java] Alternative ArrowReader close to free resources but leave ReadChannel open Date: Mon, 16 Oct 2017 19:54:04 +0000 (UTC) archived-at: Mon, 16 Oct 2017 19:54:06 -0000 Repository: arrow Updated Branches: refs/heads/master 2f2a0c139 -> 1926bdc97 ARROW-1613: [Java] Alternative ArrowReader close to free resources but leave ReadChannel open This adds an alternative `ArrowReader.close(boolean closeReadChannel)` that if called with `false` will close reader resources such as vectors/dictionaries but leave the `ReadChannel` open. The behavior of the default `ArrowReader.close()` is unchanged. Author: Bryan Cutler Closes #1138 from BryanCutler/java-ArrowRead-not-close-input-ARROW-1613 and squashes the following commits: 028f2cd5 [Bryan Cutler] Added docs to ArrowReader a9125dda [Bryan Cutler] revert test that manually closed ReadChannel 103a4192 [Bryan Cutler] changed to alternate close b735135f [Bryan Cutler] Removed closing of ReadChannel from ArrowReader.close() and updated usage in tests to close things correctly Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/1926bdc9 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/1926bdc9 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/1926bdc9 Branch: refs/heads/master Commit: 1926bdc9786af8f72111b5b6520388f362bd8809 Parents: 2f2a0c1 Author: Bryan Cutler Authored: Mon Oct 16 15:53:58 2017 -0400 Committer: Wes McKinney Committed: Mon Oct 16 15:53:58 2017 -0400 ---------------------------------------------------------------------- .../java/org/apache/arrow/tools/EchoServer.java | 4 +- .../org/apache/arrow/tools/FileRoundtrip.java | 5 +- .../org/apache/arrow/tools/EchoServerTest.java | 4 +- .../apache/arrow/vector/file/ArrowReader.java | 51 ++++++++++++++++++-- .../vector/file/TestArrowReaderWriter.java | 4 +- .../arrow/vector/file/TestArrowStreamPipe.java | 1 + 6 files changed, 56 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/1926bdc9/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java ---------------------------------------------------------------------- diff --git a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java index c53f0ea..3091bc4 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java @@ -95,9 +95,9 @@ public class EchoServer { } public void run() throws IOException { - BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); // Read the entire input stream and write it back - try (ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) { + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) { VectorSchemaRoot root = reader.getVectorSchemaRoot(); // load the first batch before instantiating the writer so that we have any dictionaries reader.loadNextBatch(); http://git-wip-us.apache.org/repos/asf/arrow/blob/1926bdc9/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java ---------------------------------------------------------------------- diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java index 7d71b0b..ab8fa6e 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java @@ -79,8 +79,9 @@ public class FileRoundtrip { File inFile = validateFile("input", inFileName); File outFile = validateFile("output", outFileName); - BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); // TODO: close - try (FileInputStream fileInputStream = new FileInputStream(inFile); + + try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(inFile); ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) { http://git-wip-us.apache.org/repos/asf/arrow/blob/1926bdc9/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java ---------------------------------------------------------------------- diff --git a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java index 467965a..ecac6d6 100644 --- a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java +++ b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java @@ -95,9 +95,9 @@ public class EchoServerTest { NullableTinyIntVector vector, int batches) throws UnknownHostException, IOException { - BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); VectorSchemaRoot root = new VectorSchemaRoot(asList(field), asList((FieldVector) vector), 0); - try (Socket socket = new Socket("localhost", serverPort); + try (BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); + Socket socket = new Socket("localhost", serverPort); ArrowStreamWriter writer = new ArrowStreamWriter(root, null, socket.getOutputStream()); ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), alloc)) { writer.start(); http://git-wip-us.apache.org/repos/asf/arrow/blob/1926bdc9/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java index 646d6fe..21fb220 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java @@ -41,6 +41,11 @@ import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.util.DictionaryUtility; +/** + * Abstract class to read ArrowRecordBatches from a ReadChannel. + * + * @param Type of ReadChannel to use + */ public abstract class ArrowReader implements DictionaryProvider, AutoCloseable { private final T in; @@ -58,7 +63,7 @@ public abstract class ArrowReader implements DictionaryPr } /** - * Returns the vector schema root. This will be loaded with new values on every call to loadNextBatch + * Returns the vector schema root. This will be loaded with new values on every call to loadNextBatch. * * @return the vector schema root * @throws IOException if reading of schema fails @@ -69,9 +74,9 @@ public abstract class ArrowReader implements DictionaryPr } /** - * Returns any dictionaries + * Returns any dictionaries that were loaded along with ArrowRecordBatches. * - * @return dictionaries, if any + * @return Map of dictionaries to dictionary id, empty if no dictionaries loaded * @throws IOException if reading of schema fails */ public Map getDictionaryVectors() throws IOException { @@ -79,6 +84,12 @@ public abstract class ArrowReader implements DictionaryPr return dictionaries; } + /** + * Lookup a dictionary that has been loaded using the dictionary id. + * + * @param id Unique identifier for a dictionary + * @return the requested dictionary or null if not found + */ @Override public Dictionary lookup(long id) { if (!initialized) { @@ -88,7 +99,12 @@ public abstract class ArrowReader implements DictionaryPr return dictionaries.get(id); } - // Returns true if a batch was read, false on EOS + /** + * Load the next ArrowRecordBatch to the vector schema root if available. + * + * @return true if a batch was read, false on EOS + * @throws IOException + */ public boolean loadNextBatch() throws IOException { ensureInitialized(); // read in all dictionary batches, then stop after our first record batch @@ -129,19 +145,44 @@ public abstract class ArrowReader implements DictionaryPr return readBatch; } + /** + * Return the number of bytes read from the ReadChannel. + * + * @return number of bytes read + */ public long bytesRead() { return in.bytesRead(); } + /** + * Close resources, including vector schema root and dictionary vectors, and the + * underlying ReadChannel. + * + * @throws IOException + */ @Override public void close() throws IOException { + close(true); + } + + /** + * Close resources, including vector schema root and dictionary vectors. If the flag + * closeReadChannel is true then close the underlying ReadChannel, otherwise leave it open. + * + * @param closeReadChannel Flag to control if closing the underlying ReadChannel + * @throws IOException + */ + public void close(boolean closeReadChannel) throws IOException { if (initialized) { root.close(); for (Dictionary dictionary : dictionaries.values()) { dictionary.getVector().close(); } } - in.close(); + + if (closeReadChannel) { + in.close(); + } } protected abstract Schema readSchema(T in) throws IOException; http://git-wip-us.apache.org/repos/asf/arrow/blob/1926bdc9/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java index 65332aa..3ce01a2 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java @@ -92,8 +92,8 @@ public class TestArrowReaderWriter { byte[] byteArray = out.toByteArray(); - SeekableReadChannel channel = new SeekableReadChannel(new ByteArrayReadableSeekableByteChannel(byteArray)); - try (ArrowFileReader reader = new ArrowFileReader(channel, allocator)) { + try (SeekableReadChannel channel = new SeekableReadChannel(new ByteArrayReadableSeekableByteChannel(byteArray)); + ArrowFileReader reader = new ArrowFileReader(channel, allocator)) { Schema readSchema = reader.getVectorSchemaRoot().getSchema(); assertEquals(schema, readSchema); assertTrue(readSchema.getFields().get(0).getTypeLayout().getVectorTypes().toString(), readSchema.getFields().get(0).getTypeLayout().getVectors().size() > 0); http://git-wip-us.apache.org/repos/asf/arrow/blob/1926bdc9/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java index a19c379..4071694 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java @@ -141,6 +141,7 @@ public class TestArrowStreamPipe { while (!done) { assertTrue(reader.loadNextBatch()); } + reader.close(); } catch (IOException e) { e.printStackTrace(); Assert.fail(e.toString()); // have to explicitly fail since we're in a separate thread