From commits-return-1457-archive-asf-public=cust-asf.ponee.io@parquet.apache.org Mon Nov 19 13:18:35 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 4E195180671 for ; Mon, 19 Nov 2018 13:18:34 +0100 (CET) Received: (qmail 65523 invoked by uid 500); 19 Nov 2018 12:18:33 -0000 Mailing-List: contact commits-help@parquet.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.apache.org Delivered-To: mailing list commits@parquet.apache.org Received: (qmail 65513 invoked by uid 99); 19 Nov 2018 12:18:33 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Nov 2018 12:18:33 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id CEDCA829A1; Mon, 19 Nov 2018 12:18:32 +0000 (UTC) Date: Mon, 19 Nov 2018 12:18:32 +0000 To: "commits@parquet.apache.org" Subject: [parquet-mr] branch master updated: PARQUET-1456: Use page index, ParquetFileReader throw ArrayIndexOutOfBoundsException (#548) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <154262991274.15312.14914960774232644562@gitbox.apache.org> From: zivanfi@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: parquet-mr X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: a69f2b30cd3c581588977ea4c93a53989e9c031c X-Git-Newrev: 5250dac71600d42edfa324786593a7d56135aa26 X-Git-Rev: 5250dac71600d42edfa324786593a7d56135aa26 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. zivanfi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/parquet-mr.git The following commit(s) were added to refs/heads/master by this push: new 5250dac PARQUET-1456: Use page index, ParquetFileReader throw ArrayIndexOutOfBoundsException (#548) 5250dac is described below commit 5250dac71600d42edfa324786593a7d56135aa26 Author: Gabor Szadovszky AuthorDate: Mon Nov 19 13:18:28 2018 +0100 PARQUET-1456: Use page index, ParquetFileReader throw ArrayIndexOutOfBoundsException (#548) The usage of static caching in the page index implementation did not allow using multiple readers at the same time. --- .../column/columnindex/ColumnIndexBuilder.java | 14 +- .../parquet/hadoop/TestMultipleWriteRead.java | 250 +++++++++++++++++++++ 2 files changed, 252 insertions(+), 12 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java index b28fdde..e28a380 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java @@ -22,15 +22,12 @@ import static java.util.Objects.requireNonNull; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.EnumMap; import java.util.Formatter; import java.util.List; -import java.util.Map; import java.util.PrimitiveIterator; import java.util.function.IntPredicate; import org.apache.parquet.column.statistics.Statistics; -import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.filter2.predicate.Operators.And; import org.apache.parquet.filter2.predicate.Operators.Eq; import org.apache.parquet.filter2.predicate.Operators.Gt; @@ -42,11 +39,11 @@ import org.apache.parquet.filter2.predicate.Operators.Not; import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.Or; import org.apache.parquet.filter2.predicate.Operators.UserDefined; +import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.PrimitiveComparator; import org.apache.parquet.schema.PrimitiveStringifier; import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import it.unimi.dsi.fastutil.booleans.BooleanArrayList; import it.unimi.dsi.fastutil.booleans.BooleanList; @@ -394,8 +391,6 @@ public abstract class ColumnIndexBuilder { } }; - private static final Map BUILDERS = new EnumMap<>(PrimitiveTypeName.class); - private PrimitiveType type; private final BooleanList nullPages = new BooleanArrayList(); private final LongList nullCounts = new LongArrayList(); @@ -469,12 +464,7 @@ public abstract class ColumnIndexBuilder { List minValues, List maxValues) { - PrimitiveTypeName typeName = type.getPrimitiveTypeName(); - ColumnIndexBuilder builder = BUILDERS.get(typeName); - if (builder == null) { - builder = createNewBuilder(type, Integer.MAX_VALUE); - BUILDERS.put(typeName, builder); - } + ColumnIndexBuilder builder = createNewBuilder(type, Integer.MAX_VALUE); builder.fill(nullPages, nullCounts, minValues, maxValues); ColumnIndexBase columnIndex = builder.build(type); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMultipleWriteRead.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMultipleWriteRead.java new file mode 100644 index 0000000..4a6bd3b --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMultipleWriteRead.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop; + +import static org.apache.parquet.filter2.predicate.FilterApi.and; +import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn; +import static org.apache.parquet.filter2.predicate.FilterApi.eq; +import static org.apache.parquet.filter2.predicate.FilterApi.gtEq; +import static org.apache.parquet.filter2.predicate.FilterApi.intColumn; +import static org.apache.parquet.filter2.predicate.FilterApi.lt; +import static org.apache.parquet.filter2.predicate.FilterApi.notEq; +import static org.apache.parquet.io.api.Binary.fromString; +import static org.apache.parquet.schema.LogicalTypeAnnotation.intType; +import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.GroupFactory; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.io.Files; + +/** + * Tests writing/reading multiple files in the same time (using multiple threads). Readers/writers do not support + * concurrency but the API shall support using separate reader/writer instances to read/write parquet files in different + * threads. (Of course, simultaneous writing to the same file is not supported.) + */ +public class TestMultipleWriteRead { + private static final MessageType SCHEMA = Types.buildMessage() + .required(INT32).named("id") + .required(BINARY).as(stringType()).named("name") + .requiredList().requiredElement(INT64).as(intType(64, false)).named("phone_numbers") + .optional(BINARY).as(stringType()).named("comment") + .named("msg"); + private static final Comparator BINARY_COMPARATOR = Types.required(BINARY).as(stringType()).named("dummy") + .comparator(); + + private static class DataGenerator implements Supplier { + private static final String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz -"; + private static final int NAME_MIN_SIZE = 5; + private static final int NAME_MAX_SIZE = 30; + private static final int PHONE_NUMBERS_MAX_SIZE = 5; + private static final long MIN_PHONE_NUMBER = 36_1_000_000; + private static final long MAX_PHONE_NUMBER = 36_1_999_999; + private static final double COMMENT_NULL_RATIO = 0.3; + private static final int COMMENT_MAX_SIZE = 200; + + private final Random random; + private final GroupFactory factory = new SimpleGroupFactory(SCHEMA); + + DataGenerator(long seed) { + random = new Random(seed); + } + + private String getString(int minSize, int maxSize) { + int size = random.nextInt(maxSize - minSize) + minSize; + StringBuilder builder = new StringBuilder(size); + for (int i = 0; i < size; ++i) { + builder.append(ALPHABET.charAt(random.nextInt(ALPHABET.length()))); + } + return builder.toString(); + } + + @Override + public Group get() { + Group group = factory.newGroup(); + group.add("id", random.nextInt()); + group.add("name", getString(NAME_MIN_SIZE, NAME_MAX_SIZE)); + Group phoneNumbers = group.addGroup("phone_numbers"); + for (int i = 0, n = random.nextInt(PHONE_NUMBERS_MAX_SIZE); i < n; ++i) { + Group phoneNumber = phoneNumbers.addGroup(0); + phoneNumber.add(0, random.nextLong() % (MAX_PHONE_NUMBER - MIN_PHONE_NUMBER) + MIN_PHONE_NUMBER); + } + if (random.nextDouble() >= COMMENT_NULL_RATIO) { + group.add("comment", getString(0, COMMENT_MAX_SIZE)); + } + return group; + } + } + + private static Path tmpDir; + + @BeforeClass + public static void createTmpDir() { + tmpDir = new Path(Files.createTempDir().getAbsolutePath().toString()); + } + + @AfterClass + public static void deleteTmpDir() throws IOException { + tmpDir.getFileSystem(new Configuration()).delete(tmpDir, true); + } + + private Path writeFile(Iterable data) throws IOException { + Path file = new Path(tmpDir, "testMultipleReadWrite_" + UUID.randomUUID() + ".parquet"); + try (ParquetWriter writer = ExampleParquetWriter.builder(file) + .config(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, SCHEMA.toString()) + .build()) { + for (Group group : data) { + writer.write(group); + } + } + return file; + } + + private void validateFile(Path file, List data) throws IOException { + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file).build()) { + for (Group group : data) { + assertEquals(group.toString(), reader.read().toString()); + } + } + } + + private void validateFile(Path file, Filter filter, Stream data) throws IOException { + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file) + .withFilter(filter) + .build()) { + for (Iterator it = data.iterator(); it.hasNext();) { + assertEquals(it.next().toString(), reader.read().toString()); + } + } + } + + private void validateFileWithIdFilter(Path file, List data) throws IOException { + validateFile(file, FilterCompat.get(eq(intColumn("id"), 0)), + data.stream().filter(group -> group.getInteger("id", 0) == 0)); + } + + private void validateFileWithCommentFilter(Path file, List data) throws IOException { + validateFile(file, FilterCompat.get(eq(binaryColumn("comment"), null)), + data.stream().filter(group -> group.getFieldRepetitionCount("comment") == 0)); + } + + private void validateFileWithComplexFilter(Path file, List data) throws IOException { + Binary binaryValueB = fromString("b"); + Filter filter = FilterCompat.get( + and( + gtEq(intColumn("id"), 0), + and( + lt(binaryColumn("name"), binaryValueB), + notEq(binaryColumn("comment"), null)))); + Predicate predicate = group -> group.getInteger("id", 0) >= 0 + && BINARY_COMPARATOR.compare(group.getBinary("name", 0), binaryValueB) < 0 + && group.getFieldRepetitionCount("comment") > 0; + validateFile(file, filter, data.stream().filter(predicate)); + } + + @Test + public void testWriteRead() throws Throwable { + // 10 random datasets with row counts 10000 to 1000 + List> data = new ArrayList<>(); + for (int i = 0; i < 10; ++i) { + data.add(Stream.generate(new DataGenerator(i)).limit(10000 - i * 1000).collect(Collectors.toList())); + } + + // Writes (and reads back the data to validate) the random values using 6 threads + List> futureFiles = new ArrayList<>(); + ExecutorService exec = Executors.newFixedThreadPool(6); + for (List d : data) { + futureFiles.add(exec.submit(() -> { + Path file = writeFile(d); + validateFile(file, d); + return file; + })); + } + List files = new ArrayList<>(); + for (Future future : futureFiles) { + try { + files.add(future.get()); + } catch (ExecutionException e) { + throw e.getCause(); + } + } + + // Executes 3 filterings on each files using 6 threads + List> futures = new ArrayList<>(); + for (int i = 0; i < 10; ++i) { + Path file = files.get(i); + List d = data.get(i); + futures.add(exec.submit(() -> { + validateFileWithIdFilter(file, d); + return null; + })); + futures.add(exec.submit(() -> { + validateFileWithCommentFilter(file, d); + return null; + })); + futures.add(exec.submit(() -> { + validateFileWithComplexFilter(file, d); + return null; + })); + } + for (Future future : futures) { + try { + future.get(); + } catch (ExecutionException e) { + throw e.getCause(); + } + } + } +}