crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject git commit: CRUNCH-224 Support SequenceFiles in MemPipeline
Date Thu, 20 Jun 2013 08:43:51 GMT
Updated Branches:
  refs/heads/master 687894564 -> c51bcd63a


CRUNCH-224 Support SequenceFiles in MemPipeline

Contributed by Dominique Dierickx


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/c51bcd63
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/c51bcd63
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/c51bcd63

Branch: refs/heads/master
Commit: c51bcd63afffc6e5a9c8a171503bc487440b2d85
Parents: 6878945
Author: Gabriel Reid <greid@apache.org>
Authored: Thu Jun 20 09:03:08 2013 +0200
Committer: Gabriel Reid <greid@apache.org>
Committed: Thu Jun 20 09:03:08 2013 +0200

----------------------------------------------------------------------
 .../mem/MemPipelineFileReadingWritingIT.java    | 170 +++++++++++++++++++
 .../impl/mem/MemPipelineFileWritingIT.java      |  58 -------
 .../org/apache/crunch/impl/mem/MemPipeline.java | 102 ++++++++---
 3 files changed, 248 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/c51bcd63/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java
b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java
new file mode 100644
index 0000000..2d66672
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mem;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mem.collect.MemCollection;
+import org.apache.crunch.impl.mem.collect.MemTable;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+public class MemPipelineFileReadingWritingIT {
+  @Rule
+  public TemporaryPath baseTmpDir = TemporaryPaths.create();
+  
+  private File inputFile;
+  private File outputFile;
+  
+  
+  private static final Collection<String> EXPECTED_COLLECTION = Lists.newArrayList("hello",
"world");
+  @SuppressWarnings("unchecked")
+  private static final Collection<Pair<Integer, String>> EXPECTED_TABLE = Lists.newArrayList(
+                                                        Pair.of(1, "hello"), 
+                                                        Pair.of(2, "world"));
+  
+
+  @Before
+  public void setUp() throws IOException {
+    inputFile = baseTmpDir.getFile("test-read.seq");
+    outputFile = baseTmpDir.getFile("test-write.seq");
+  }
+
+  @Test
+  public void testMemPipelineFileWriter() throws Exception {
+    File tmpDir = baseTmpDir.getFile("mempipe");
+    Pipeline p = MemPipeline.getInstance();
+    PCollection<String> lines = MemPipeline.collectionOf("hello", "world");
+    p.writeTextFile(lines, tmpDir.toString());
+    p.done();
+    assertTrue(tmpDir.exists());
+    File[] files = tmpDir.listFiles();
+    assertTrue(files != null && files.length > 0);
+    for (File f : files) {
+      if (!f.getName().startsWith(".")) {
+        List<String> txt = Files.readLines(f, Charsets.UTF_8);
+        assertEquals(ImmutableList.of("hello", "world"), txt);
+      }
+    }
+  }
+
+  private void createTestSequenceFile(final File seqFile) throws IOException {
+    SequenceFile.Writer writer = null;
+    writer = new Writer(FileSystem.getLocal(baseTmpDir.getDefaultConfiguration()),
+              baseTmpDir.getDefaultConfiguration(), 
+              new Path(seqFile.toString()), 
+              IntWritable.class, Text.class);
+    writer.append(new IntWritable(1), new Text("hello"));
+    writer.append(new IntWritable(2), new Text("world"));
+    writer.close();
+  }
+
+  @Test
+  public void testMemPipelineReadSequenceFile() throws IOException {
+    // set up input
+    createTestSequenceFile(inputFile);
+
+    // read from sequence file
+    final PCollection<Pair<Integer, String>> readCollection = MemPipeline.getInstance().read(
+      From.sequenceFile(inputFile.toString(), 
+        Writables.tableOf(
+          Writables.ints(), 
+          Writables.strings())));
+
+    // assert read same as written.
+    assertEquals(EXPECTED_TABLE, Lists.newArrayList(readCollection.materialize()));
+  }
+
+  @Test
+  public void testMemPipelineWriteSequenceFile_PCollection() throws IOException {
+    // write
+    PCollection<String> collection = MemPipeline.typedCollectionOf(Writables.strings(),
EXPECTED_COLLECTION);
+    final Target target = To.sequenceFile(outputFile.toString());
+    MemPipeline.getInstance().write(collection, target);
+
+    // read
+    final SequenceFile.Reader reader = new Reader(FileSystem.getLocal(
+      baseTmpDir.getDefaultConfiguration()), new Path(outputFile.toString()),
+        baseTmpDir.getDefaultConfiguration());
+    final List<String> actual = Lists.newArrayList();
+    final NullWritable key = NullWritable.get();
+    final Text value = new Text();
+    while (reader.next(key, value)) {
+      actual.add(value.toString());
+    }
+    reader.close();
+
+    // assert read same as written
+    assertEquals(EXPECTED_COLLECTION, actual);
+  }
+
+  @Test
+  public void testMemPipelineWriteSequenceFile_PTable() throws IOException {
+    // write
+    final MemTable<Integer, String> collection = new MemTable<Integer, String>(EXPECTED_TABLE,
//
+        Writables.tableOf(
+          Writables.ints(), 
+          Writables.strings()), "test input");
+    final Target target = To.sequenceFile(outputFile.toString());
+    MemPipeline.getInstance().write(collection, target);
+
+    // read
+    final SequenceFile.Reader reader = new Reader(FileSystem.getLocal(baseTmpDir
+        .getDefaultConfiguration()), new Path(outputFile.toString()),
+        baseTmpDir.getDefaultConfiguration());
+    final List<Pair<Integer, String>> actual = Lists.newArrayList();
+    final IntWritable key = new IntWritable();
+    final Text value = new Text();
+    while (reader.next(key, value)) {
+      actual.add(Pair.of(key.get(), value.toString()));
+    }
+    reader.close();
+
+    // assert read same as written
+    assertEquals(EXPECTED_TABLE, actual);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/c51bcd63/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
deleted file mode 100644
index 976a43e..0000000
--- a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mem;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.util.List;
-
-import org.apache.crunch.PCollection;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
-import com.google.common.io.Files;
-
-public class MemPipelineFileWritingIT {
-  @Rule
-  public TemporaryPath baseTmpDir = TemporaryPaths.create();
-
-  @Test
-  public void testMemPipelineFileWriter() throws Exception {
-    File tmpDir = baseTmpDir.getFile("mempipe");
-    Pipeline p = MemPipeline.getInstance();
-    PCollection<String> lines = MemPipeline.collectionOf("hello", "world");
-    p.writeTextFile(lines, tmpDir.toString());
-    p.done();
-    assertTrue(tmpDir.exists());
-    File[] files = tmpDir.listFiles();
-    assertTrue(files != null && files.length > 0);
-    for (File f : files) {
-      if (!f.getName().startsWith(".")) {
-        List<String> txt = Files.readLines(f, Charsets.UTF_8);
-        assertEquals(ImmutableList.of("hello", "world"), txt);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/c51bcd63/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index e2a2529..9001e51 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -17,9 +17,11 @@
  */
 package org.apache.crunch.impl.mem;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericContainer;
@@ -42,6 +44,8 @@ import org.apache.crunch.io.At;
 import org.apache.crunch.io.PathTarget;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.avro.AvroFileTarget;
+import org.apache.crunch.io.seq.SeqFileTarget;
+import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.avro.ReflectDataFactory;
@@ -49,12 +53,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapreduce.Counters;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 public class MemPipeline implements Pipeline {
 
@@ -170,36 +175,48 @@ public class MemPipeline implements Pipeline {
   }
   
   @Override
-  public void write(PCollection<?> collection, Target target,
-      Target.WriteMode writeMode) {
+  public void write(PCollection<?> collection, Target target, Target.WriteMode writeMode)
{
     target.handleExisting(writeMode, getConfiguration());
     if (writeMode != Target.WriteMode.APPEND && activeTargets.contains(target)) {
-      throw new CrunchRuntimeException("Target " + target + " is already written in the current
run." +
-          " Use WriteMode.APPEND in order to write additional data to it.");
+      throw new CrunchRuntimeException("Target " + target
+          + " is already written in the current run."
+          + " Use WriteMode.APPEND in order to write additional data to it.");
     }
     activeTargets.add(target);
     if (target instanceof PathTarget) {
       Path path = ((PathTarget) target).getPath();
       try {
         FileSystem fs = path.getFileSystem(conf);
-        FSDataOutputStream os = fs.create(new Path(path, "out" + outputIndex));
         outputIndex++;
-        if (target instanceof AvroFileTarget) {
-          writeAvroFile(os, collection.materialize());
-        } else if (collection instanceof PTable) {
-          for (Object o : collection.materialize()) {
-            Pair p = (Pair) o;
-            os.writeBytes(p.first().toString());
-            os.writeBytes("\t");
-            os.writeBytes(p.second().toString());
-            os.writeBytes("\r\n");
+        if (target instanceof SeqFileTarget) {
+          if (collection instanceof PTable) {
+            writeSequenceFileFromPTable(fs, path, (PTable<?, ?>) collection);
+          } else {
+            writeSequenceFileFromPCollection(fs, path, collection);
           }
         } else {
-          for (Object o : collection.materialize()) {
-            os.writeBytes(o.toString() + "\r\n");
+          FSDataOutputStream os = fs.create(new Path(path, "out" + outputIndex));
+          if (target instanceof AvroFileTarget && !(collection instanceof PTable))
{
+
+            writeAvroFile(os, collection.materialize());
+          } else {
+            LOG.warn("Defaulting to write to a text file from MemPipeline");
+            if (collection instanceof PTable) {
+              for (Object o : collection.materialize()) {
+                Pair p = (Pair) o;
+                os.writeBytes(p.first().toString());
+                os.writeBytes("\t");
+                os.writeBytes(p.second().toString());
+                os.writeBytes("\r\n");
+              }
+            } else {
+              for (Object o : collection.materialize()) {
+                os.writeBytes(o.toString() + "\r\n");
+              }
+            }
           }
+          os.close();
         }
-        os.close();
       } catch (IOException e) {
         LOG.error("Exception writing target: " + target, e);
       }
@@ -233,7 +250,44 @@ public class MemPipeline implements Pipeline {
     dataFileWriter.close();
     outputStream.close();
   }
+  
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  private void writeSequenceFileFromPTable(final FileSystem fs, final Path path, final PTable
table)
+      throws IOException {
+    final PTableType pType = table.getPTableType();
+    final Class<?> keyClass = pType.getConverter().getKeyClass();
+    final Class<?> valueClass = pType.getConverter().getValueClass();
+
+    final SequenceFile.Writer writer = new SequenceFile.Writer(fs, fs.getConf(), path, keyClass,
+        valueClass);
+
+    for (final Object o : table.materialize()) {
+      final Pair<?,?> p = (Pair) o;
+      final Object key = pType.getKeyType().getOutputMapFn().map(p.first());
+      final Object value = pType.getValueType().getOutputMapFn().map(p.second());
+      writer.append(key, value);
+    }
 
+    writer.close();
+  }
+  
+  private void writeSequenceFileFromPCollection(final FileSystem fs, final Path path,
+      final PCollection collection) throws IOException {
+    final PType pType = collection.getPType();
+    final Converter converter = pType.getConverter();
+    final Class valueClass = converter.getValueClass();
+
+    final SequenceFile.Writer writer = new SequenceFile.Writer(fs, fs.getConf(), path,
+        NullWritable.class, valueClass);
+
+    for (final Object o : collection.materialize()) {
+      final Object value = pType.getOutputMapFn().map(o);
+      writer.append(NullWritable.get(), value);
+    }
+
+    writer.close();
+  }
+   
   @Override
   public PCollection<String> readTextFile(String pathName) {
     return read(At.textFile(pathName));


Mime
View raw message