arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [1/4] arrow git commit: ARROW-542: Adding dictionary encoding to FileWriter
Date Thu, 16 Mar 2017 18:00:00 GMT
Repository: arrow
Updated Branches:
  refs/heads/master 3b650014f -> 49f666e74


http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStream.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStream.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStream.java
new file mode 100644
index 0000000..e7cdf3f
--- /dev/null
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStream.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.file;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import io.netty.buffer.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.NullableTinyIntVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.schema.ArrowMessage;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.stream.ArrowStreamReader;
+import org.apache.arrow.vector.stream.ArrowStreamWriter;
+import org.apache.arrow.vector.stream.MessageSerializerTest;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.Test;
+
+public class TestArrowStream extends BaseFileTest {
+  @Test
+  public void testEmptyStream() throws IOException {
+    Schema schema = MessageSerializerTest.testSchema();
+    VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
+
+    // Write the stream.
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    try (ArrowStreamWriter writer = new ArrowStreamWriter(root, null, out)) {
+    }
+
+    ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+    try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator)) {
+      assertEquals(schema, reader.getVectorSchemaRoot().getSchema());
+      // Empty should return nothing. Can be called repeatedly.
+      reader.loadNextBatch();
+      assertEquals(0, reader.getVectorSchemaRoot().getRowCount());
+      reader.loadNextBatch();
+      assertEquals(0, reader.getVectorSchemaRoot().getRowCount());
+    }
+  }
+
+  @Test
+  public void testReadWrite() throws IOException {
+    Schema schema = MessageSerializerTest.testSchema();
+    try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+      int numBatches = 1;
+
+      root.getFieldVectors().get(0).allocateNew();
+      NullableTinyIntVector.Mutator mutator = (NullableTinyIntVector.Mutator) root.getFieldVectors().get(0).getMutator();
+      for (int i = 0; i < 16; i++) {
+        mutator.set(i, i < 8 ? 1 : 0, (byte)(i + 1));
+      }
+      mutator.setValueCount(16);
+      root.setRowCount(16);
+
+      ByteArrayOutputStream out = new ByteArrayOutputStream();
+      long bytesWritten = 0;
+      try (ArrowStreamWriter writer = new ArrowStreamWriter(root, null, out)) {
+        writer.start();
+        for (int i = 0; i < numBatches; i++) {
+          writer.writeBatch();
+        }
+        writer.end();
+        bytesWritten = writer.bytesWritten();
+      }
+
+      ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+      try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator)) {
+        Schema readSchema = reader.getVectorSchemaRoot().getSchema();
+        assertEquals(schema, readSchema);
+        for (int i = 0; i < numBatches; i++) {
+          reader.loadNextBatch();
+        }
+        // TODO figure out why reader isn't getting padding bytes
+        assertEquals(bytesWritten, reader.bytesRead() + 4);
+        reader.loadNextBatch();
+        assertEquals(0, reader.getVectorSchemaRoot().getRowCount());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java
b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java
new file mode 100644
index 0000000..46d4679
--- /dev/null
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.file;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.channels.Pipe;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.NullableTinyIntVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.schema.ArrowMessage;
+import org.apache.arrow.vector.stream.ArrowStreamReader;
+import org.apache.arrow.vector.stream.ArrowStreamWriter;
+import org.apache.arrow.vector.stream.MessageSerializerTest;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestArrowStreamPipe {
+  Schema schema = MessageSerializerTest.testSchema();
+  BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
+
+  private final class WriterThread extends Thread {
+
+    private final int numBatches;
+    private final ArrowStreamWriter writer;
+    private final VectorSchemaRoot root;
+
+    public WriterThread(int numBatches, WritableByteChannel sinkChannel)
+        throws IOException {
+      this.numBatches = numBatches;
+      BufferAllocator allocator = alloc.newChildAllocator("writer thread", 0, Integer.MAX_VALUE);
+      root = VectorSchemaRoot.create(schema, allocator);
+      writer = new ArrowStreamWriter(root, null, sinkChannel);
+    }
+
+    @Override
+    public void run() {
+      try {
+        writer.start();
+        for (int j = 0; j < numBatches; j++) {
+          root.getFieldVectors().get(0).allocateNew();
+          NullableTinyIntVector.Mutator mutator = (NullableTinyIntVector.Mutator) root.getFieldVectors().get(0).getMutator();
+          // Send a changing batch id first
+          mutator.set(0, j);
+          for (int i = 1; i < 16; i++) {
+            mutator.set(i, i < 8 ? 1 : 0, (byte)(i + 1));
+          }
+          mutator.setValueCount(16);
+          root.setRowCount(16);
+
+          writer.writeBatch();
+        }
+        writer.close();
+        root.close();
+      } catch (IOException e) {
+        e.printStackTrace();
+        Assert.fail(e.toString()); // have to explicitly fail since we're in a separate thread
+      }
+    }
+
+    public long bytesWritten() { return writer.bytesWritten(); }
+  }
+
+  private final class ReaderThread extends Thread {
+    private int batchesRead = 0;
+    private final ArrowStreamReader reader;
+    private final BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
+    private boolean done = false;
+
+    public ReaderThread(ReadableByteChannel sourceChannel)
+        throws IOException {
+      reader = new ArrowStreamReader(sourceChannel, alloc) {
+        @Override
+        protected ArrowMessage readMessage(ReadChannel in, BufferAllocator allocator) throws
IOException {
+          // Read all the batches. Each batch contains an incrementing id and then some
+          // constant data. Verify both.
+          ArrowMessage message = super.readMessage(in, allocator);
+          if (message == null) {
+            done = true;
+          } else {
+            batchesRead++;
+          }
+          return message;
+        }
+        @Override
+        public void loadNextBatch() throws IOException {
+          super.loadNextBatch();
+          if (!done) {
+            VectorSchemaRoot root = getVectorSchemaRoot();
+            Assert.assertEquals(16, root.getRowCount());
+            NullableTinyIntVector vector = (NullableTinyIntVector) root.getFieldVectors().get(0);
+            Assert.assertEquals((byte)(batchesRead - 1), vector.getAccessor().get(0));
+            for (int i = 1; i < 16; i++) {
+              if (i < 8) {
+                Assert.assertEquals((byte)(i + 1), vector.getAccessor().get(i));
+              } else {
+                Assert.assertTrue(vector.getAccessor().isNull(i));
+              }
+            }
+          }
+        }
+      };
+    }
+
+    @Override
+    public void run() {
+      try {
+        assertEquals(schema, reader.getVectorSchemaRoot().getSchema());
+        assertTrue(
+            reader.getVectorSchemaRoot().getSchema().getFields().get(0).getTypeLayout().getVectorTypes().toString(),
+            reader.getVectorSchemaRoot().getSchema().getFields().get(0).getTypeLayout().getVectors().size()
> 0);
+        while (!done) {
+          reader.loadNextBatch();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        Assert.fail(e.toString()); // have to explicitly fail since we're in a separate thread
+      }
+    }
+
+    public int getBatchesRead() { return batchesRead; }
+    public long bytesRead() { return reader.bytesRead(); }
+  }
+
+  // Starts up a producer and consumer thread to read/write batches.
+  @Test
+  public void pipeTest() throws IOException, InterruptedException {
+    int NUM_BATCHES = 10;
+    Pipe pipe = Pipe.open();
+    WriterThread writer = new WriterThread(NUM_BATCHES, pipe.sink());
+    ReaderThread reader = new ReaderThread(pipe.source());
+
+    writer.start();
+    reader.start();
+    reader.join();
+    writer.join();
+
+    assertEquals(NUM_BATCHES, reader.getBatchesRead());
+    assertEquals(writer.bytesWritten(), reader.bytesRead());
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java
b/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java
index 3720a13..c88958c 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java
@@ -70,7 +70,7 @@ public class TestJSONFile extends BaseFileTest {
     int count = COUNT;
     try (
         BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors",
0, Integer.MAX_VALUE);
-        NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null))
{
+        NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null,
null)) {
       writeComplexData(count, parent);
       VectorSchemaRoot root = new VectorSchemaRoot(parent.getChild("root"));
       validateComplexContent(root.getRowCount(), root);
@@ -92,7 +92,7 @@ public class TestJSONFile extends BaseFileTest {
     int count = COUNT;
     try (
         BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors",
0, Integer.MAX_VALUE);
-        NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null))
{
+        NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null,
null)) {
 
       writeUnionData(count, parent);
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java
b/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java
index 7b4de80..bb2ccf8 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java
@@ -34,6 +34,7 @@ import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.file.ReadChannel;
 import org.apache.arrow.vector.file.WriteChannel;
 import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.schema.ArrowMessage;
 import org.apache.arrow.vector.schema.ArrowRecordBatch;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
@@ -88,9 +89,10 @@ public class MessageSerializerTest {
     MessageSerializer.serialize(new WriteChannel(Channels.newChannel(out)), batch);
 
     ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
-    ArrowRecordBatch deserialized = MessageSerializer.deserializeRecordBatch(
-        new ReadChannel(Channels.newChannel(in)), alloc);
-    verifyBatch(deserialized, validity, values);
+    ReadChannel channel = new ReadChannel(Channels.newChannel(in));
+    ArrowMessage deserialized = MessageSerializer.deserializeMessageBatch(channel, alloc);
+    assertEquals(ArrowRecordBatch.class, deserialized.getClass());
+    verifyBatch((ArrowRecordBatch) deserialized, validity, values);
   }
 
   public static Schema testSchema() {

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStream.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStream.java
b/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStream.java
deleted file mode 100644
index 725272a..0000000
--- a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStream.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.arrow.vector.stream;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.file.BaseFileTest;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.types.pojo.Schema;
-import org.junit.Test;
-
-import io.netty.buffer.ArrowBuf;
-
-public class TestArrowStream extends BaseFileTest {
-  @Test
-  public void testEmptyStream() throws IOException {
-    Schema schema = MessageSerializerTest.testSchema();
-
-    // Write the stream.
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    try (ArrowStreamWriter writer = new ArrowStreamWriter(out, schema)) {
-    }
-
-    ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
-    try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator)) {
-      reader.init();
-      assertEquals(schema, reader.getSchema());
-      // Empty should return null. Can be called repeatedly.
-      assertTrue(reader.nextRecordBatch() == null);
-      assertTrue(reader.nextRecordBatch() == null);
-    }
-  }
-
-  @Test
-  public void testReadWrite() throws IOException {
-    Schema schema = MessageSerializerTest.testSchema();
-    byte[] validity = new byte[] { (byte)255, 0};
-    // second half is "undefined"
-    byte[] values = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
-
-    int numBatches = 5;
-    BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    long bytesWritten = 0;
-    try (ArrowStreamWriter writer = new ArrowStreamWriter(out, schema)) {
-      ArrowBuf validityb = MessageSerializerTest.buf(alloc, validity);
-      ArrowBuf valuesb =  MessageSerializerTest.buf(alloc, values);
-      for (int i = 0; i < numBatches; i++) {
-        writer.writeRecordBatch(new ArrowRecordBatch(
-            16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb)));
-      }
-      bytesWritten = writer.bytesWritten();
-    }
-
-    ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
-    try (ArrowStreamReader reader = new ArrowStreamReader(in, alloc)) {
-      reader.init();
-      Schema readSchema = reader.getSchema();
-      for (int i = 0; i < numBatches; i++) {
-        assertEquals(schema, readSchema);
-        assertTrue(
-            readSchema.getFields().get(0).getTypeLayout().getVectorTypes().toString(),
-            readSchema.getFields().get(0).getTypeLayout().getVectors().size() > 0);
-        ArrowRecordBatch recordBatch = reader.nextRecordBatch();
-        MessageSerializerTest.verifyBatch(recordBatch, validity, values);
-        assertTrue(recordBatch != null);
-      }
-      assertTrue(reader.nextRecordBatch() == null);
-      assertEquals(bytesWritten, reader.bytesRead());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java
b/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java
deleted file mode 100644
index aa0b77e..0000000
--- a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.arrow.vector.stream;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.nio.channels.Pipe;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.types.pojo.Schema;
-import org.junit.Test;
-
-import io.netty.buffer.ArrowBuf;
-
-public class TestArrowStreamPipe {
-  Schema schema = MessageSerializerTest.testSchema();
-  // second half is "undefined"
-  byte[] values = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
-
-  private final class WriterThread extends Thread {
-    private final int numBatches;
-    private final ArrowStreamWriter writer;
-
-    public WriterThread(int numBatches, WritableByteChannel sinkChannel)
-        throws IOException {
-      this.numBatches = numBatches;
-      writer = new ArrowStreamWriter(sinkChannel, schema);
-    }
-
-    @Override
-    public void run() {
-      BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
-      try {
-        ArrowBuf valuesb =  MessageSerializerTest.buf(alloc, values);
-        for (int i = 0; i < numBatches; i++) {
-          // Send a changing byte id first.
-          byte[] validity = new byte[] { (byte)i, 0};
-          ArrowBuf validityb = MessageSerializerTest.buf(alloc, validity);
-          writer.writeRecordBatch(new ArrowRecordBatch(
-              16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb)));
-        }
-        writer.close();
-      } catch (IOException e) {
-        e.printStackTrace();
-        assertTrue(false);
-      }
-    }
-
-    public long bytesWritten() { return writer.bytesWritten(); }
-  }
-
-  private final class ReaderThread extends Thread {
-    private int batchesRead = 0;
-    private final ArrowStreamReader reader;
-    private final BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
-
-    public ReaderThread(ReadableByteChannel sourceChannel)
-        throws IOException {
-      reader = new ArrowStreamReader(sourceChannel, alloc);
-    }
-
-    @Override
-    public void run() {
-      try {
-        reader.init();
-        assertEquals(schema, reader.getSchema());
-        assertTrue(
-            reader.getSchema().getFields().get(0).getTypeLayout().getVectorTypes().toString(),
-            reader.getSchema().getFields().get(0).getTypeLayout().getVectors().size() >
0);
-
-        // Read all the batches. Each batch contains an incrementing id and then some
-        // constant data. Verify both.
-        while (true) {
-          ArrowRecordBatch batch = reader.nextRecordBatch();
-          if (batch == null) break;
-          byte[] validity = new byte[] { (byte)batchesRead, 0};
-          MessageSerializerTest.verifyBatch(batch, validity, values);
-          batchesRead++;
-        }
-      } catch (IOException e) {
-        e.printStackTrace();
-        assertTrue(false);
-      }
-    }
-
-    public int getBatchesRead() { return batchesRead; }
-    public long bytesRead() { return reader.bytesRead(); }
-  }
-
-  // Starts up a producer and consumer thread to read/write batches.
-  @Test
-  public void pipeTest() throws IOException, InterruptedException {
-    int NUM_BATCHES = 10;
-    Pipe pipe = Pipe.open();
-    WriterThread writer = new WriterThread(NUM_BATCHES, pipe.sink());
-    ReaderThread reader = new ReaderThread(pipe.source());
-
-    writer.start();
-    reader.start();
-    reader.join();
-    writer.join();
-
-    assertEquals(NUM_BATCHES, reader.getBatchesRead());
-    assertEquals(writer.bytesWritten(), reader.bytesRead());
-  }
-}


Mime
View raw message