Repository: parquet-mr
Updated Branches:
refs/heads/master 81f480149 -> 8bfd9b4d8
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 9512b93..bdde70e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -28,6 +28,8 @@ import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.io.OutputFile;
import org.apache.parquet.schema.MessageType;
/**
@@ -219,7 +221,8 @@ public class ParquetWriter<T> implements Closeable {
boolean validating,
WriterVersion writerVersion,
Configuration conf) throws IOException {
- this(file, mode, writeSupport, compressionCodecName, blockSize,
+ this(HadoopOutputFile.fromPath(file, conf),
+ mode, writeSupport, compressionCodecName, blockSize,
validating, conf, MAX_PADDING_SIZE_DEFAULT,
ParquetProperties.builder()
.withPageSize(pageSize)
@@ -257,11 +260,11 @@ public class ParquetWriter<T> implements Closeable {
}
ParquetWriter(
- Path file,
+ OutputFile file,
ParquetFileWriter.Mode mode,
WriteSupport<T> writeSupport,
CompressionCodecName compressionCodecName,
- int blockSize,
+ int rowGroupSize,
boolean validating,
Configuration conf,
int maxPaddingSize,
@@ -271,7 +274,7 @@ public class ParquetWriter<T> implements Closeable {
MessageType schema = writeContext.getSchema();
ParquetFileWriter fileWriter = new ParquetFileWriter(
- conf, schema, file, mode, blockSize, maxPaddingSize);
+ file, schema, mode, rowGroupSize, maxPaddingSize);
fileWriter.start();
this.codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold());
@@ -281,7 +284,7 @@ public class ParquetWriter<T> implements Closeable {
writeSupport,
schema,
writeContext.getExtraMetaData(),
- blockSize,
+ rowGroupSize,
compressor,
validating,
encodingProps);
@@ -324,7 +327,8 @@ public class ParquetWriter<T> implements Closeable {
* @param <SELF> The type of this builder that is returned by builder methods
*/
public abstract static class Builder<T, SELF extends Builder<T, SELF>> {
- private final Path file;
+ private OutputFile file = null;
+ private Path path = null;
private Configuration conf = new Configuration();
private ParquetFileWriter.Mode mode;
private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME;
@@ -334,8 +338,12 @@ public class ParquetWriter<T> implements Closeable {
private ParquetProperties.Builder encodingPropsBuilder =
ParquetProperties.builder();
- protected Builder(Path file) {
- this.file = file;
+ protected Builder(Path path) {
+ this.path = path;
+ }
+
+ protected Builder(OutputFile path) {
+ this.file = path;
}
/**
@@ -485,15 +493,35 @@ public class ParquetWriter<T> implements Closeable {
}
/**
+ * Set a property that will be available to the read path. For writers that use a Hadoop
+ * configuration, this is the recommended way to add configuration values.
+ *
+ * @param property a String property name
+ * @param value a String property value
+ * @return this builder for method chaining.
+ */
+ public SELF config(String property, String value) {
+ conf.set(property, value);
+ return self();
+ }
+
+ /**
* Build a {@link ParquetWriter} with the accumulated configuration.
*
* @return a configured {@code ParquetWriter} instance.
* @throws IOException
*/
public ParquetWriter<T> build() throws IOException {
- return new ParquetWriter<T>(file, mode, getWriteSupport(conf), codecName,
- rowGroupSize, enableValidation, conf, maxPaddingSize,
- encodingPropsBuilder.build());
+ if (file != null) {
+ return new ParquetWriter<>(file,
+ mode, getWriteSupport(conf), codecName, rowGroupSize, enableValidation, conf,
+ maxPaddingSize, encodingPropsBuilder.build());
+ } else {
+ return new ParquetWriter<>(HadoopOutputFile.fromPath(path, conf),
+ mode, getWriteSupport(conf), codecName,
+ rowGroupSize, enableValidation, conf, maxPaddingSize,
+ encodingPropsBuilder.build());
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java
index 4696319..a70a0d0 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java
@@ -20,10 +20,12 @@ package org.apache.parquet.hadoop;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
// Essentially taken from:
// https://github.com/twitter/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/LzoRecordReader.java#L124
@@ -60,6 +62,10 @@ public class UnmaterializableRecordCounter {
);
}
+ public UnmaterializableRecordCounter(ParquetReadOptions options, long totalNumRecords) {
+ this(getFloat(options, BAD_RECORD_THRESHOLD_CONF_KEY, DEFAULT_THRESHOLD), totalNumRecords);
+ }
+
public UnmaterializableRecordCounter(double errorThreshold, long totalNumRecords) {
this.errorThreshold = errorThreshold;
this.totalNumRecords = totalNumRecords;
@@ -85,4 +91,13 @@ public class UnmaterializableRecordCounter {
throw new ParquetDecodingException(message, cause);
}
}
+
+ private static float getFloat(ParquetReadOptions options, String key, float defaultValue) {
+ String value = options.getProperty(key);
+ if (value != null) {
+ return Float.valueOf(value);
+ } else {
+ return defaultValue;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java
deleted file mode 100644
index 9657cc1..0000000
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.hadoop.codec;
-
-/**
- * This exception will be thrown when the codec is not supported by parquet, meaning there is no
- * matching codec defined in {@link org.apache.parquet.hadoop.metadata.CompressionCodecName}
- */
-public class CompressionCodecNotSupportedException extends RuntimeException {
- private final Class codecClass;
-
- public CompressionCodecNotSupportedException(Class codecClass) {
- super("codec not supported: " + codecClass.getName());
- this.codecClass = codecClass;
- }
-
- public Class getCodecClass() {
- return codecClass;
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
deleted file mode 100644
index 153133e..0000000
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.hadoop.metadata;
-
-import org.apache.parquet.format.CompressionCodec;
-import org.apache.parquet.hadoop.codec.CompressionCodecNotSupportedException;
-
-import java.util.Locale;
-
-public enum CompressionCodecName {
- UNCOMPRESSED(null, CompressionCodec.UNCOMPRESSED, ""),
- SNAPPY("org.apache.parquet.hadoop.codec.SnappyCodec", CompressionCodec.SNAPPY, ".snappy"),
- GZIP("org.apache.hadoop.io.compress.GzipCodec", CompressionCodec.GZIP, ".gz"),
- LZO("com.hadoop.compression.lzo.LzoCodec", CompressionCodec.LZO, ".lzo"),
- BROTLI("org.apache.hadoop.io.compress.BrotliCodec", CompressionCodec.BROTLI, ".br"),
- LZ4("org.apache.hadoop.io.compress.Lz4Codec", CompressionCodec.LZ4, ".lz4"),
- ZSTD("org.apache.hadoop.io.compress.ZStandardCodec", CompressionCodec.ZSTD, ".zstd");
-
- public static CompressionCodecName fromConf(String name) {
- if (name == null) {
- return UNCOMPRESSED;
- }
- return valueOf(name.toUpperCase(Locale.ENGLISH));
- }
-
- public static CompressionCodecName fromCompressionCodec(Class<?> clazz) {
- if (clazz == null) {
- return UNCOMPRESSED;
- }
- String name = clazz.getName();
- for (CompressionCodecName codec : CompressionCodecName.values()) {
- if (name.equals(codec.getHadoopCompressionCodecClassName())) {
- return codec;
- }
- }
- throw new CompressionCodecNotSupportedException(clazz);
- }
-
- public static CompressionCodecName fromParquet(CompressionCodec codec) {
- for (CompressionCodecName codecName : CompressionCodecName.values()) {
- if (codec.equals(codecName.parquetCompressionCodec)) {
- return codecName;
- }
- }
- throw new IllegalArgumentException("Unknown compression codec " + codec);
- }
-
- private final String hadoopCompressionCodecClass;
- private final CompressionCodec parquetCompressionCodec;
- private final String extension;
-
- private CompressionCodecName(String hadoopCompressionCodecClass, CompressionCodec parquetCompressionCodec, String extension) {
- this.hadoopCompressionCodecClass = hadoopCompressionCodecClass;
- this.parquetCompressionCodec = parquetCompressionCodec;
- this.extension = extension;
- }
-
- public String getHadoopCompressionCodecClassName() {
- return hadoopCompressionCodecClass;
- }
-
- public Class getHadoopCompressionCodecClass() {
- String codecClassName = getHadoopCompressionCodecClassName();
- if (codecClassName==null) {
- return null;
- }
- try {
- return Class.forName(codecClassName);
- } catch (ClassNotFoundException e) {
- return null;
- }
- }
-
- public CompressionCodec getParquetCompressionCodec() {
- return parquetCompressionCodec;
- }
-
- public String getExtension() {
- return extension;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
index 4a03b1a..876a1f3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
@@ -20,32 +20,23 @@
package org.apache.parquet.hadoop.util;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.parquet.io.SeekableInputStream;
-import java.io.EOFException;
+import org.apache.parquet.io.DelegatingSeekableInputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
/**
* SeekableInputStream implementation that implements read(ByteBuffer) for
* Hadoop 1 FSDataInputStream.
*/
-class H1SeekableInputStream extends SeekableInputStream {
-
- private final int COPY_BUFFER_SIZE = 8192;
- private final byte[] temp = new byte[COPY_BUFFER_SIZE];
+class H1SeekableInputStream extends DelegatingSeekableInputStream {
private final FSDataInputStream stream;
public H1SeekableInputStream(FSDataInputStream stream) {
+ super(stream);
this.stream = stream;
}
@Override
- public void close() throws IOException {
- stream.close();
- }
-
- @Override
public long getPos() throws IOException {
return stream.getPos();
}
@@ -56,16 +47,6 @@ class H1SeekableInputStream extends SeekableInputStream {
}
@Override
- public int read() throws IOException {
- return stream.read();
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- return stream.read(b, off, len);
- }
-
- @Override
public void readFully(byte[] bytes) throws IOException {
stream.readFully(bytes, 0, bytes.length);
}
@@ -75,80 +56,4 @@ class H1SeekableInputStream extends SeekableInputStream {
stream.readFully(bytes);
}
- @Override
- public int read(ByteBuffer buf) throws IOException {
- if (buf.hasArray()) {
- return readHeapBuffer(stream, buf);
- } else {
- return readDirectBuffer(stream, buf, temp);
- }
- }
-
- @Override
- public void readFully(ByteBuffer buf) throws IOException {
- if (buf.hasArray()) {
- readFullyHeapBuffer(stream, buf);
- } else {
- readFullyDirectBuffer(stream, buf, temp);
- }
- }
-
- // Visible for testing
- static int readHeapBuffer(FSDataInputStream f, ByteBuffer buf) throws IOException {
- int bytesRead = f.read(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
- if (bytesRead < 0) {
- // if this resulted in EOF, don't update position
- return bytesRead;
- } else {
- buf.position(buf.position() + bytesRead);
- return bytesRead;
- }
- }
-
- // Visible for testing
- static void readFullyHeapBuffer(FSDataInputStream f, ByteBuffer buf) throws IOException {
- f.readFully(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
- buf.position(buf.limit());
- }
-
- // Visible for testing
- static int readDirectBuffer(FSDataInputStream f, ByteBuffer buf, byte[] temp) throws IOException {
- // copy all the bytes that return immediately, stopping at the first
- // read that doesn't return a full buffer.
- int nextReadLength = Math.min(buf.remaining(), temp.length);
- int totalBytesRead = 0;
- int bytesRead;
-
- while ((bytesRead = f.read(temp, 0, nextReadLength)) == temp.length) {
- buf.put(temp);
- totalBytesRead += bytesRead;
- nextReadLength = Math.min(buf.remaining(), temp.length);
- }
-
- if (bytesRead < 0) {
- // return -1 if nothing was read
- return totalBytesRead == 0 ? -1 : totalBytesRead;
- } else {
- // copy the last partial buffer
- buf.put(temp, 0, bytesRead);
- totalBytesRead += bytesRead;
- return totalBytesRead;
- }
- }
-
- // Visible for testing
- static void readFullyDirectBuffer(FSDataInputStream f, ByteBuffer buf, byte[] temp) throws IOException {
- int nextReadLength = Math.min(buf.remaining(), temp.length);
- int bytesRead = 0;
-
- while (nextReadLength > 0 && (bytesRead = f.read(temp, 0, nextReadLength)) >= 0) {
- buf.put(temp, 0, bytesRead);
- nextReadLength = Math.min(buf.remaining(), temp.length);
- }
-
- if (bytesRead < 0 && buf.remaining() > 0) {
- throw new EOFException(
- "Reached the end of stream. Still have: " + buf.remaining() + " bytes left");
- }
- }
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
index ec4567e..c68f6b6 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
@@ -20,7 +20,7 @@
package org.apache.parquet.hadoop.util;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.io.DelegatingSeekableInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -29,7 +29,7 @@ import java.nio.ByteBuffer;
* SeekableInputStream implementation for FSDataInputStream that implements
* ByteBufferReadable in Hadoop 2.
*/
-class H2SeekableInputStream extends SeekableInputStream {
+class H2SeekableInputStream extends DelegatingSeekableInputStream {
// Visible for testing
interface Reader {
@@ -40,6 +40,7 @@ class H2SeekableInputStream extends SeekableInputStream {
private final Reader reader;
public H2SeekableInputStream(FSDataInputStream stream) {
+ super(stream);
this.stream = stream;
this.reader = new H2Reader();
}
@@ -60,21 +61,6 @@ class H2SeekableInputStream extends SeekableInputStream {
}
@Override
- public int read() throws IOException {
- return stream.read();
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- return stream.read(b, off, len);
- }
-
- @Override
- public void readFully(byte[] bytes) throws IOException {
- stream.readFully(bytes, 0, bytes.length);
- }
-
- @Override
public void readFully(byte[] bytes, int start, int len) throws IOException {
stream.readFully(bytes);
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java
new file mode 100644
index 0000000..a46c8db
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.CodecFactory;
+
+public class HadoopCodecs {
+ public static CompressionCodecFactory newFactory(int sizeHint) {
+ return new CodecFactory(new Configuration(), sizeHint);
+ }
+
+ public static CompressionCodecFactory newFactory(Configuration conf, int sizeHint) {
+ return new CodecFactory(conf, sizeHint);
+ }
+
+ public static CompressionCodecFactory newDirectFactory(Configuration conf, ByteBufferAllocator allocator, int sizeHint) {
+ return CodecFactory.createDirectCodecFactory(conf, allocator, sizeHint);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
new file mode 100644
index 0000000..4740fd4
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+public class HadoopOutputFile implements OutputFile {
+ // need to supply a buffer size when setting block size. this is the default
+ // for hadoop 1 to present. copying it avoids loading DFSConfigKeys.
+ private static final int DFS_BUFFER_SIZE_DEFAULT = 4096;
+
+ private static final Set<String> BLOCK_FS_SCHEMES = new HashSet<String>();
+ static {
+ BLOCK_FS_SCHEMES.add("hdfs");
+ BLOCK_FS_SCHEMES.add("webhdfs");
+ BLOCK_FS_SCHEMES.add("viewfs");
+ }
+
+ // visible for testing
+ public static Set<String> getBlockFileSystems() {
+ return BLOCK_FS_SCHEMES;
+ }
+
+ private static boolean supportsBlockSize(FileSystem fs) {
+ return BLOCK_FS_SCHEMES.contains(fs.getUri().getScheme());
+ }
+
+ private final FileSystem fs;
+ private final Path path;
+ private final Configuration conf;
+
+ public static HadoopOutputFile fromPath(Path path, Configuration conf)
+ throws IOException {
+ FileSystem fs = path.getFileSystem(conf);
+ return new HadoopOutputFile(fs, fs.makeQualified(path), conf);
+ }
+
+ private HadoopOutputFile(FileSystem fs, Path path, Configuration conf) {
+ this.fs = fs;
+ this.path = path;
+ this.conf = conf;
+ }
+
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ @Override
+ public PositionOutputStream create(long blockSizeHint) throws IOException {
+ return HadoopStreams.wrap(fs.create(path, false /* do not overwrite */,
+ DFS_BUFFER_SIZE_DEFAULT, fs.getDefaultReplication(path),
+ Math.max(fs.getDefaultBlockSize(path), blockSizeHint)));
+ }
+
+ @Override
+ public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
+ return HadoopStreams.wrap(fs.create(path, true /* overwrite if exists */,
+ DFS_BUFFER_SIZE_DEFAULT, fs.getDefaultReplication(path),
+ Math.max(fs.getDefaultBlockSize(path), blockSizeHint)));
+ }
+
+ @Override
+ public boolean supportsBlockSize() {
+ return supportsBlockSize(fs);
+ }
+
+ @Override
+ public long defaultBlockSize() {
+ return fs.getDefaultBlockSize(path);
+ }
+
+ @Override
+ public String toString() {
+ return path.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java
new file mode 100644
index 0000000..4b194aa
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.parquet.io.PositionOutputStream;
+import java.io.IOException;
+
+public class HadoopPositionOutputStream extends PositionOutputStream {
+ private final FSDataOutputStream wrapped;
+
+ HadoopPositionOutputStream(FSDataOutputStream wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return wrapped.getPos();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ wrapped.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ wrapped.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ wrapped.write(b, off, len);
+ }
+
+ public void sync() throws IOException {
+ wrapped.hsync();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ wrapped.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ wrapped.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
index 8731bd6..c35e98f 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
@@ -20,8 +20,11 @@
package org.apache.parquet.hadoop.util;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.parquet.Preconditions;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.io.PositionOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,6 +49,7 @@ public class HadoopStreams {
* @return a SeekableInputStream
*/
public static SeekableInputStream wrap(FSDataInputStream stream) {
+ Preconditions.checkNotNull(stream, "Cannot wrap a null input stream");
if (byteBufferReadableClass != null && h2SeekableConstructor != null &&
byteBufferReadableClass.isInstance(stream.getWrappedStream())) {
try {
@@ -99,4 +103,15 @@ public class HadoopStreams {
return null;
}
+ /**
+ * Wraps a {@link FSDataOutputStream} in a {@link PositionOutputStream}
+ * implementation for Parquet writers.
+ *
+ * @param stream a Hadoop FSDataOutputStream
+ * @return a SeekableOutputStream
+ */
+ public static PositionOutputStream wrap(FSDataOutputStream stream) {
+ Preconditions.checkNotNull(stream, "Cannot wrap a null output stream");
+ return new HadoopPositionOutputStream(stream);
+ }
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java
index 0ac9c0f..8e3e6c7 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java
@@ -35,6 +35,8 @@ import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
@@ -105,7 +107,7 @@ public class TestInputOutputFormatWithPadding {
@Test
public void testBasicBehaviorWithPadding() throws Exception {
- ParquetFileWriter.BLOCK_FS_SCHEMES.add("file");
+ HadoopOutputFile.getBlockFileSystems().add("file");
File inputFile = temp.newFile();
FileOutputStream out = new FileOutputStream(inputFile);
@@ -186,7 +188,7 @@ public class TestInputOutputFormatWithPadding {
Assert.assertEquals("Should match written file content",
FILE_CONTENT, reconstructed);
- ParquetFileWriter.BLOCK_FS_SCHEMES.remove("file");
+ HadoopOutputFile.getBlockFileSystems().remove("file");
}
private void waitForJob(Job job) throws Exception {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 1442e04..6915c86 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.parquet.Version;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java
new file mode 100644
index 0000000..b41b3c8
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+class MockHadoopInputStream extends ByteArrayInputStream
+ implements Seekable, PositionedReadable {
+ static final byte[] TEST_ARRAY = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+
+ private int[] lengths;
+ private int current = 0;
+ MockHadoopInputStream(int... actualReadLengths) {
+ super(TEST_ARRAY);
+ this.lengths = actualReadLengths;
+ }
+
+ @Override
+ public synchronized int read(byte[] b, int off, int len) {
+ if (current < lengths.length) {
+ if (len <= lengths[current]) {
+ // when len == lengths[current], the next read will by 0 bytes
+ int bytesRead = super.read(b, off, len);
+ lengths[current] -= bytesRead;
+ return bytesRead;
+ } else {
+ int bytesRead = super.read(b, off, lengths[current]);
+ current += 1;
+ return bytesRead;
+ }
+ } else {
+ return super.read(b, off, len);
+ }
+ }
+
+ @Override
+ public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+ seek(position);
+ return read(buffer, offset, length);
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+ throw new UnsupportedOperationException("Not actually supported.");
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer) throws IOException {
+ throw new UnsupportedOperationException("Not actually supported.");
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ this.pos = (int) pos;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return this.pos;
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ seek(targetPos);
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java
deleted file mode 100644
index a112288..0000000
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.parquet.hadoop.util;
-
-import org.apache.hadoop.fs.PositionedReadable;
-import org.apache.hadoop.fs.Seekable;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-
-class MockInputStream extends ByteArrayInputStream
- implements Seekable, PositionedReadable {
- static final byte[] TEST_ARRAY = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
-
- private int[] lengths;
- private int current = 0;
- MockInputStream(int... actualReadLengths) {
- super(TEST_ARRAY);
- this.lengths = actualReadLengths;
- }
-
- @Override
- public synchronized int read(byte[] b, int off, int len) {
- if (current < lengths.length) {
- if (len <= lengths[current]) {
- // when len == lengths[current], the next read will by 0 bytes
- int bytesRead = super.read(b, off, len);
- lengths[current] -= bytesRead;
- return bytesRead;
- } else {
- int bytesRead = super.read(b, off, lengths[current]);
- current += 1;
- return bytesRead;
- }
- } else {
- return super.read(b, off, len);
- }
- }
-
- @Override
- public int read(long position, byte[] buffer, int offset, int length) throws IOException {
- seek(position);
- return read(buffer, offset, length);
- }
-
- @Override
- public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
- throw new UnsupportedOperationException("Not actually supported.");
- }
-
- @Override
- public void readFully(long position, byte[] buffer) throws IOException {
- throw new UnsupportedOperationException("Not actually supported.");
- }
-
- @Override
- public void seek(long pos) throws IOException {
- this.pos = (int) pos;
- }
-
- @Override
- public long getPos() throws IOException {
- return this.pos;
- }
-
- @Override
- public boolean seekToNewSource(long targetPos) throws IOException {
- seek(targetPos);
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java
deleted file mode 100644
index 9e4e2a9..0000000
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java
+++ /dev/null
@@ -1,761 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.parquet.hadoop.util;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.parquet.hadoop.TestUtils;
-import org.junit.Assert;
-import org.junit.Test;
-import java.io.EOFException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.Callable;
-
-import static org.apache.parquet.hadoop.util.MockInputStream.TEST_ARRAY;
-
-public class TestHadoop1ByteBufferReads {
-
- private static final ThreadLocal<byte[]> TEMP = new ThreadLocal<byte[]>() {
- @Override
- protected byte[] initialValue() {
- return new byte[8192];
- }
- };
-
- @Test
- public void testHeapRead() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocate(20);
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
-
- int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(10, len);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(20, readBuffer.limit());
-
- len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(-1, len);
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testHeapSmallBuffer() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocate(5);
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
-
- int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(5, len);
- Assert.assertEquals(5, readBuffer.position());
- Assert.assertEquals(5, readBuffer.limit());
-
- len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(0, len);
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 5), readBuffer);
- }
-
- @Test
- public void testHeapSmallReads() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocate(10);
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(2, len);
- Assert.assertEquals(2, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(3, len);
- Assert.assertEquals(5, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(3, len);
- Assert.assertEquals(8, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(2, len);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testHeapPosition() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocate(20);
- readBuffer.position(10);
- readBuffer.mark();
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(8));
-
- int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(8, len);
- Assert.assertEquals(18, readBuffer.position());
- Assert.assertEquals(20, readBuffer.limit());
-
- len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(2, len);
- Assert.assertEquals(20, readBuffer.position());
- Assert.assertEquals(20, readBuffer.limit());
-
- len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(-1, len);
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testHeapLimit() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocate(20);
- readBuffer.limit(8);
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7));
-
- int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(7, len);
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(8, readBuffer.limit());
-
- len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(1, len);
- Assert.assertEquals(8, readBuffer.position());
- Assert.assertEquals(8, readBuffer.limit());
-
- len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(0, len);
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
- }
-
- @Test
- public void testHeapPositionAndLimit() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocate(20);
- readBuffer.position(5);
- readBuffer.limit(13);
- readBuffer.mark();
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7));
-
- int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(7, len);
- Assert.assertEquals(12, readBuffer.position());
- Assert.assertEquals(13, readBuffer.limit());
-
- len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(1, len);
- Assert.assertEquals(13, readBuffer.position());
- Assert.assertEquals(13, readBuffer.limit());
-
- len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(0, len);
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
- }
-
- @Test
- public void testDirectRead() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
-
- int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(10, len);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(20, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(-1, len);
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testDirectSmallBuffer() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocateDirect(5);
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
-
- int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(5, len);
- Assert.assertEquals(5, readBuffer.position());
- Assert.assertEquals(5, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(0, len);
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 5), readBuffer);
- }
-
- @Test
- public void testDirectSmallReads() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(2, len);
- Assert.assertEquals(2, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(3, len);
- Assert.assertEquals(5, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(3, len);
- Assert.assertEquals(8, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(2, len);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testDirectPosition() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
- readBuffer.position(10);
- readBuffer.mark();
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(8));
-
- int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(8, len);
- Assert.assertEquals(18, readBuffer.position());
- Assert.assertEquals(20, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(2, len);
- Assert.assertEquals(20, readBuffer.position());
- Assert.assertEquals(20, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(-1, len);
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testDirectLimit() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocate(20);
- readBuffer.limit(8);
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7));
-
- int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(7, len);
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(8, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(1, len);
- Assert.assertEquals(8, readBuffer.position());
- Assert.assertEquals(8, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(0, len);
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
- }
-
- @Test
- public void testDirectPositionAndLimit() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
- readBuffer.position(5);
- readBuffer.limit(13);
- readBuffer.mark();
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7));
-
- int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(7, len);
- Assert.assertEquals(12, readBuffer.position());
- Assert.assertEquals(13, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(1, len);
- Assert.assertEquals(13, readBuffer.position());
- Assert.assertEquals(13, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(0, len);
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
- }
-
- @Test
- public void testDirectSmallTempBufferSmallReads() throws Exception {
- byte[] temp = new byte[2]; // this will cause readDirectBuffer to loop
-
- ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
- Assert.assertEquals(2, len);
- Assert.assertEquals(2, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
- Assert.assertEquals(3, len);
- Assert.assertEquals(5, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
- Assert.assertEquals(3, len);
- Assert.assertEquals(8, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
- Assert.assertEquals(2, len);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
- Assert.assertEquals(-1, len);
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testDirectSmallTempBufferWithPositionAndLimit() throws Exception {
- byte[] temp = new byte[2]; // this will cause readDirectBuffer to loop
-
- ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
- readBuffer.position(5);
- readBuffer.limit(13);
- readBuffer.mark();
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7));
-
- int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
- Assert.assertEquals(7, len);
- Assert.assertEquals(12, readBuffer.position());
- Assert.assertEquals(13, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
- Assert.assertEquals(1, len);
- Assert.assertEquals(13, readBuffer.position());
- Assert.assertEquals(13, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
- Assert.assertEquals(0, len);
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
- }
-
- @Test
- public void testHeapReadFullySmallBuffer() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocate(8);
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
-
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(8, readBuffer.position());
- Assert.assertEquals(8, readBuffer.limit());
-
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(8, readBuffer.position());
- Assert.assertEquals(8, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
- }
-
- @Test
- public void testHeapReadFullyLargeBuffer() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocate(20);
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
-
- TestUtils.assertThrows("Should throw EOFException",
- EOFException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- return null;
- }
- });
-
- Assert.assertEquals(0, readBuffer.position());
- Assert.assertEquals(20, readBuffer.limit());
- }
-
- @Test
- public void testHeapReadFullyJustRight() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocate(10);
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
-
- // reads all of the bytes available without EOFException
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- // trying to read 0 more bytes doesn't result in EOFException
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testHeapReadFullySmallReads() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocate(10);
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testHeapReadFullyPosition() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocate(10);
- readBuffer.position(3);
- readBuffer.mark();
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
- }
-
- @Test
- public void testHeapReadFullyLimit() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocate(10);
- readBuffer.limit(7);
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(7, readBuffer.limit());
-
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(7, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
-
- readBuffer.position(7);
- readBuffer.limit(10);
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testHeapReadFullyPositionAndLimit() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocate(10);
- readBuffer.position(3);
- readBuffer.limit(7);
- readBuffer.mark();
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(7, readBuffer.limit());
-
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(7, readBuffer.limit());
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
-
- readBuffer.position(7);
- readBuffer.limit(10);
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
- }
-
- @Test
- public void testDirectReadFullySmallBuffer() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocateDirect(8);
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(8, readBuffer.position());
- Assert.assertEquals(8, readBuffer.limit());
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(8, readBuffer.position());
- Assert.assertEquals(8, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
- }
-
- @Test
- public void testDirectReadFullyLargeBuffer() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
-
- TestUtils.assertThrows("Should throw EOFException",
- EOFException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- return null;
- }
- });
-
- // NOTE: This behavior differs from readFullyHeapBuffer because direct uses
- // several read operations that will read up to the end of the input. This
- // is a correct value because the bytes in the buffer are valid. This
- // behavior can't be implemented for the heap buffer without using the read
- // method instead of the readFully method on the underlying
- // FSDataInputStream.
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(20, readBuffer.limit());
- }
-
- @Test
- public void testDirectReadFullyJustRight() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
-
- // reads all of the bytes available without EOFException
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- // trying to read 0 more bytes doesn't result in EOFException
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testDirectReadFullySmallReads() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testDirectReadFullyPosition() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
- readBuffer.position(3);
- readBuffer.mark();
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
- }
-
- @Test
- public void testDirectReadFullyLimit() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
- readBuffer.limit(7);
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(7, readBuffer.limit());
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(7, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
-
- readBuffer.position(7);
- readBuffer.limit(10);
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testDirectReadFullyPositionAndLimit() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
- readBuffer.position(3);
- readBuffer.limit(7);
- readBuffer.mark();
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(7, readBuffer.limit());
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(7, readBuffer.limit());
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
-
- readBuffer.position(7);
- readBuffer.limit(10);
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
- }
-
- @Test
- public void testDirectReadFullySmallTempBufferWithPositionAndLimit() throws Exception {
- byte[] temp = new byte[2]; // this will cause readFully to loop
-
- final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
- readBuffer.position(3);
- readBuffer.limit(7);
- readBuffer.mark();
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, temp);
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(7, readBuffer.limit());
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, temp);
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(7, readBuffer.limit());
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
-
- readBuffer.position(7);
- readBuffer.limit(10);
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, temp);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
index 86b903c..68c9b3b 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
@@ -28,7 +28,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
-import static org.apache.parquet.hadoop.util.MockInputStream.TEST_ARRAY;
+import static org.apache.parquet.hadoop.util.MockHadoopInputStream.TEST_ARRAY;
public class TestHadoop2ByteBufferReads {
@@ -59,7 +59,7 @@ public class TestHadoop2ByteBufferReads {
public void testHeapReadFullySmallBuffer() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocate(8);
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream());
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
@@ -79,7 +79,7 @@ public class TestHadoop2ByteBufferReads {
public void testHeapReadFullyLargeBuffer() throws Exception {
final ByteBuffer readBuffer = ByteBuffer.allocate(20);
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream());
final MockBufferReader reader = new MockBufferReader(hadoopStream);
TestUtils.assertThrows("Should throw EOFException",
@@ -105,7 +105,7 @@ public class TestHadoop2ByteBufferReads {
public void testHeapReadFullyJustRight() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocate(10);
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream());
MockBufferReader reader = new MockBufferReader(hadoopStream);
// reads all of the bytes available without EOFException
@@ -127,7 +127,7 @@ public class TestHadoop2ByteBufferReads {
public void testHeapReadFullySmallReads() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocate(10);
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
@@ -149,7 +149,7 @@ public class TestHadoop2ByteBufferReads {
readBuffer.position(3);
readBuffer.mark();
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
@@ -170,7 +170,7 @@ public class TestHadoop2ByteBufferReads {
ByteBuffer readBuffer = ByteBuffer.allocate(10);
readBuffer.limit(7);
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
@@ -203,7 +203,7 @@ public class TestHadoop2ByteBufferReads {
readBuffer.limit(7);
readBuffer.mark();
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
@@ -233,7 +233,7 @@ public class TestHadoop2ByteBufferReads {
public void testDirectReadFullySmallBuffer() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocateDirect(8);
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream());
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
@@ -253,7 +253,7 @@ public class TestHadoop2ByteBufferReads {
public void testDirectReadFullyLargeBuffer() throws Exception {
final ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream());
final MockBufferReader reader = new MockBufferReader(hadoopStream);
TestUtils.assertThrows("Should throw EOFException",
@@ -279,7 +279,7 @@ public class TestHadoop2ByteBufferReads {
public void testDirectReadFullyJustRight() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream());
MockBufferReader reader = new MockBufferReader(hadoopStream);
// reads all of the bytes available without EOFException
@@ -301,7 +301,7 @@ public class TestHadoop2ByteBufferReads {
public void testDirectReadFullySmallReads() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
@@ -323,7 +323,7 @@ public class TestHadoop2ByteBufferReads {
readBuffer.position(3);
readBuffer.mark();
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
@@ -344,7 +344,7 @@ public class TestHadoop2ByteBufferReads {
ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
readBuffer.limit(7);
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
H2SeekableInputStream.Reader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
@@ -377,7 +377,7 @@ public class TestHadoop2ByteBufferReads {
readBuffer.limit(7);
readBuffer.mark();
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
----------------------------------------------------------------------
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
index 5d79a49..fe64587 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.hadoop.util.HiddenFileFilter;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.FileMetaData;
@@ -91,7 +92,7 @@ public class MergeCommand extends ArgsOnlyCommand {
tooSmallFilesMerged = true;
}
- writer.appendFile(conf, input);
+ writer.appendFile(HadoopInputFile.fromPath(input, conf));
}
if (tooSmallFilesMerged) {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 44b0b62..05e3e47 100644
--- a/pom.xml
+++ b/pom.xml
@@ -262,7 +262,14 @@
<exclude>org/apache/parquet/avro/SpecificDataSupplier</exclude> <!-- made public -->
<exclude>org/apache/parquet/io/ColumnIOFactory$ColumnIOCreatorVisitor</exclude> <!-- removed non-API class -->
<exclude>org/apache/parquet/io/ColumnIOFactory/**</exclude> <!-- removed non-API class and methods-->
- <exclude>org/apache/parquet/hadoop/codec/SnappyCompressor</exclude> <!-- added synchronized modifier -->
+ <exclude>org/apache/parquet/hadoop/codec/SnappyCompressor</exclude> <!-- added synchronized modifier -->
+ <exclude>org/apache/parquet/bytes/BytesInput</exclude> <!-- moved to parquet-common -->
+ <exclude>org/apache/parquet/bytes/CapacityByteArrayOutputStream</exclude> <!-- moved to parquet-common -->
+ <exclude>org/apache/parquet/bytes/ConcatenatingByteArrayCollector</exclude> <!-- moved to parquet-common -->
+ <exclude>org/apache/parquet/bytes/LittleEndianDataInputStream</exclude> <!-- moved to parquet-common -->
+ <exclude>org/apache/parquet/bytes/LittleEndianDataOutputStream</exclude> <!-- moved to parquet-common -->
+ <exclude>org/apache/parquet/hadoop/metadata/CompressionCodecName</exclude> <!-- moved to parquet-common -->
+ <exclude>org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException</exclude> <!-- moved to parquet-common -->
</excludes>
</requireBackwardCompatibility>
</rules>
|