parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [01/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.
Date Mon, 27 Apr 2015 23:11:58 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master 720b988fd -> b10870e4e


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java
b/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java
deleted file mode 100644
index ac8a957..0000000
--- a/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop.example;
-
-import static java.lang.Thread.sleep;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.CounterGroup;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.junit.Before;
-import org.junit.Test;
-
-import parquet.Log;
-import parquet.example.data.Group;
-import parquet.example.data.simple.SimpleGroupFactory;
-import parquet.hadoop.ParquetInputFormat;
-import parquet.hadoop.ParquetOutputFormat;
-import parquet.hadoop.api.DelegatingReadSupport;
-import parquet.hadoop.api.DelegatingWriteSupport;
-import parquet.hadoop.api.InitContext;
-import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.hadoop.util.ContextUtil;
-import parquet.schema.MessageTypeParser;
-
-public class TestInputOutputFormat {
-  private static final Log LOG = Log.getLog(TestInputOutputFormat.class);
-  final Path parquetPath = new Path("target/test/example/TestInputOutputFormat/parquet");
-  final Path inputPath = new Path("src/test/java/parquet/hadoop/example/TestInputOutputFormat.java");
-  final Path outputPath = new Path("target/test/example/TestInputOutputFormat/out");
-  Job writeJob;
-  Job readJob;
-  private String writeSchema;
-  private String readSchema;
-  private String partialSchema;
-  private Configuration conf;
-
-  private Class<? extends Mapper<?,?,?,?>> readMapperClass;
-  private Class<? extends Mapper<?,?,?,?>> writeMapperClass;
-
-  @Before
-  public void setUp() {
-    conf = new Configuration();
-    writeSchema = "message example {\n" +
-            "required int32 line;\n" +
-            "required binary content;\n" +
-            "}";
-
-    readSchema = "message example {\n" +
-            "required int32 line;\n" +
-            "required binary content;\n" +
-            "}";
-
-    partialSchema = "message example {\n" +
-            "required int32 line;\n" +
-            "}";
-
-    readMapperClass = ReadMapper.class;
-    writeMapperClass = WriteMapper.class;
-  }
-
-
-  public static final class MyWriteSupport extends DelegatingWriteSupport<Group> {
-
-    private long count = 0;
-
-    public MyWriteSupport() {
-      super(new GroupWriteSupport());
-    }
-
-    @Override
-    public void write(Group record) {
-      super.write(record);
-      ++ count;
-    }
-
-    @Override
-    public parquet.hadoop.api.WriteSupport.FinalizedWriteContext finalizeWrite() {
-      Map<String, String> extraMetadata = new HashMap<String, String>();
-      extraMetadata.put("my.count", String.valueOf(count));
-      return new FinalizedWriteContext(extraMetadata);
-    }
-  }
-
-  public static final class MyReadSupport extends DelegatingReadSupport<Group> {
-    public MyReadSupport() {
-      super(new GroupReadSupport());
-    }
-
-    @Override
-    public parquet.hadoop.api.ReadSupport.ReadContext init(InitContext context) {
-      Set<String> counts = context.getKeyValueMetadata().get("my.count");
-      assertTrue("counts: " + counts, counts.size() > 0);
-      return super.init(context);
-    }
-  }
-
-  public static class ReadMapper extends Mapper<LongWritable, Text, Void, Group> {
-    private SimpleGroupFactory factory;
-
-    protected void setup(org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Void,
Group>.Context context) throws java.io.IOException, InterruptedException {
-      factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(ContextUtil.getConfiguration(context)));
-    }
-
-    ;
-
-    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Void,
Group>.Context context) throws java.io.IOException, InterruptedException {
-      Group group = factory.newGroup()
-              .append("line", (int) key.get())
-              .append("content", value.toString());
-      context.write(null, group);
-    }
-  }
-
-  public static class WriteMapper extends Mapper<Void, Group, LongWritable, Text> {
-    protected void map(Void key, Group value, Mapper<Void, Group, LongWritable, Text>.Context
context) throws IOException, InterruptedException {
-      context.write(new LongWritable(value.getInteger("line", 0)), new Text(value.getString("content",
0)));
-    }
-  }
-  public static class PartialWriteMapper extends Mapper<Void, Group, LongWritable, Text>
{
-    protected void map(Void key, Group value, Mapper<Void, Group, LongWritable, Text>.Context
context) throws IOException, InterruptedException {
-      context.write(new LongWritable(value.getInteger("line", 0)), new Text("dummy"));
-    }
-  }
-  private void runMapReduceJob(CompressionCodecName codec) throws IOException, ClassNotFoundException,
InterruptedException {
-    runMapReduceJob(codec, Collections.<String, String>emptyMap());
-  }
-  private void runMapReduceJob(CompressionCodecName codec, Map<String, String> extraConf)
throws IOException, ClassNotFoundException, InterruptedException {
-    Configuration conf = new Configuration(this.conf);
-    for (Map.Entry<String, String> entry : extraConf.entrySet()) {
-      conf.set(entry.getKey(), entry.getValue());
-    }
-    final FileSystem fileSystem = parquetPath.getFileSystem(conf);
-    fileSystem.delete(parquetPath, true);
-    fileSystem.delete(outputPath, true);
-    {
-      writeJob = new Job(conf, "write");
-      TextInputFormat.addInputPath(writeJob, inputPath);
-      writeJob.setInputFormatClass(TextInputFormat.class);
-      writeJob.setNumReduceTasks(0);
-      ParquetOutputFormat.setCompression(writeJob, codec);
-      ParquetOutputFormat.setOutputPath(writeJob, parquetPath);
-      writeJob.setOutputFormatClass(ParquetOutputFormat.class);
-      writeJob.setMapperClass(readMapperClass);
-
-      ParquetOutputFormat.setWriteSupportClass(writeJob, MyWriteSupport.class);
-      GroupWriteSupport.setSchema(
-              MessageTypeParser.parseMessageType(writeSchema),
-              writeJob.getConfiguration());
-      writeJob.submit();
-      waitForJob(writeJob);
-    }
-    {
-      conf.set(ReadSupport.PARQUET_READ_SCHEMA, readSchema);
-      readJob = new Job(conf, "read");
-
-      readJob.setInputFormatClass(ParquetInputFormat.class);
-      ParquetInputFormat.setReadSupportClass(readJob, MyReadSupport.class);
-
-      ParquetInputFormat.setInputPaths(readJob, parquetPath);
-      readJob.setOutputFormatClass(TextOutputFormat.class);
-      TextOutputFormat.setOutputPath(readJob, outputPath);
-      readJob.setMapperClass(writeMapperClass);
-      readJob.setNumReduceTasks(0);
-      readJob.submit();
-      waitForJob(readJob);
-    }
-  }
-
-  private void testReadWrite(CompressionCodecName codec) throws IOException, ClassNotFoundException,
InterruptedException {
-    testReadWrite(codec, Collections.<String, String>emptyMap());
-  }
-  private void testReadWrite(CompressionCodecName codec, Map<String, String> conf)
throws IOException, ClassNotFoundException, InterruptedException {
-    runMapReduceJob(codec, conf);
-    final BufferedReader in = new BufferedReader(new FileReader(new File(inputPath.toString())));
-    final BufferedReader out = new BufferedReader(new FileReader(new File(outputPath.toString(),
"part-m-00000")));
-    String lineIn;
-    String lineOut = null;
-    int lineNumber = 0;
-    while ((lineIn = in.readLine()) != null && (lineOut = out.readLine()) != null)
{
-      ++lineNumber;
-      lineOut = lineOut.substring(lineOut.indexOf("\t") + 1);
-      assertEquals("line " + lineNumber, lineIn, lineOut);
-    }
-    assertNull("line " + lineNumber, out.readLine());
-    assertNull("line " + lineNumber, lineIn);
-    in.close();
-    out.close();
-  }
-
-  @Test
-  public void testReadWrite() throws IOException, ClassNotFoundException, InterruptedException
{
-    // TODO: Lzo requires additional external setup steps so leave it out for now
-    testReadWrite(CompressionCodecName.GZIP);
-    testReadWrite(CompressionCodecName.UNCOMPRESSED);
-    testReadWrite(CompressionCodecName.SNAPPY);
-  }
-
-  @Test
-  public void testReadWriteTaskSideMD() throws IOException, ClassNotFoundException, InterruptedException
{
-    testReadWrite(CompressionCodecName.UNCOMPRESSED, new HashMap<String, String>()
{{ put("parquet.task.side.metadata", "true"); }});
-  }
-
-  @Test
-  public void testProjection() throws Exception{
-    readSchema=partialSchema;
-    writeMapperClass = PartialWriteMapper.class;
-    runMapReduceJob(CompressionCodecName.GZIP);
-  }
-
-  private static long value(Job job, String groupName, String name) throws Exception {
-    // getGroup moved to AbstractCounters
-    Method getGroup = org.apache.hadoop.mapreduce.Counters.class.getMethod("getGroup", String.class);
-    // CounterGroup changed to an interface
-    Method findCounter = org.apache.hadoop.mapreduce.CounterGroup.class.getMethod("findCounter",
String.class);
-    // Counter changed to an interface
-    Method getValue = org.apache.hadoop.mapreduce.Counter.class.getMethod("getValue");
-    CounterGroup group = (CounterGroup) getGroup.invoke(job.getCounters(), groupName);
-    Counter counter = (Counter) findCounter.invoke(group, name);
-    return (Long) getValue.invoke(counter);
-  }
-
-  @Test
-  public void testReadWriteWithCounter() throws Exception {
-    runMapReduceJob(CompressionCodecName.GZIP);
-    assertTrue(value(readJob, "parquet", "bytesread") > 0L);
-    assertTrue(value(readJob, "parquet", "bytestotal") > 0L);
-    assertTrue(value(readJob, "parquet", "bytesread")
-            == value(readJob, "parquet", "bytestotal"));
-    //not testing the time read counter since it could be zero due to the size of data is
too small
-  }
-
-  @Test
-  public void testReadWriteWithoutCounter() throws Exception {
-    conf.set("parquet.benchmark.time.read", "false");
-    conf.set("parquet.benchmark.bytes.total", "false");
-    conf.set("parquet.benchmark.bytes.read", "false");
-    runMapReduceJob(CompressionCodecName.GZIP);
-    assertTrue(value(readJob, "parquet", "bytesread") == 0L);
-    assertTrue(value(readJob, "parquet", "bytestotal") == 0L);
-    assertTrue(value(readJob, "parquet", "timeread") == 0L);
-  }
-
-  private void waitForJob(Job job) throws InterruptedException, IOException {
-    while (!job.isComplete()) {
-      LOG.debug("waiting for job " + job.getJobName());
-      sleep(100);
-    }
-    LOG.info("status for job " + job.getJobName() + ": " + (job.isSuccessful() ? "SUCCESS"
: "FAILURE"));
-    if (!job.isSuccessful()) {
-      throw new RuntimeException("job failed " + job.getJobName());
-    }
-  }
-}


Mime
View raw message