crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject git commit: CRUNCH-421 Write Avro PTable from MemPipeline
Date Mon, 16 Jun 2014 16:23:19 GMT
Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 c173dd20a -> 7da6de5c6


CRUNCH-421 Write Avro PTable from MemPipeline

Remove artificial incompatibility for writing Avro PTables from
within a MemPipeline. Test case provided by Tom White.

Signed-off-by: Tom White <tomwhite@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/7da6de5c
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/7da6de5c
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/7da6de5c

Branch: refs/heads/apache-crunch-0.8
Commit: 7da6de5c68a026e0182433dc9089cc9dfd250f05
Parents: c173dd2
Author: Gabriel Reid <greid@apache.org>
Authored: Mon Jun 16 16:42:44 2014 +0200
Committer: Tom White <tomwhite@apache.org>
Committed: Mon Jun 16 17:07:41 2014 +0100

----------------------------------------------------------------------
 .../crunch/io/avro/AvroMemPipelineIT.java       | 38 +++++++++++++++++---
 .../org/apache/crunch/impl/mem/MemPipeline.java |  2 +-
 2 files changed, 34 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/7da6de5c/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
index 40224e7..e501373 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
@@ -21,17 +21,22 @@ import static org.junit.Assert.assertEquals;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.io.At;
 import org.apache.crunch.io.To;
+import org.apache.crunch.lib.PTables;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
@@ -42,9 +47,6 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 public class AvroMemPipelineIT implements Serializable {
 
   private transient File avroFile;
@@ -157,4 +159,30 @@ public class AvroMemPipelineIT implements Serializable {
     return savedRecord;
   }
 
+  @Test
+  public void testMemPipelineWithPTable() {
+
+    String writeRecord = "John Doe";
+
+    final PCollection<String> collection = MemPipeline.typedCollectionOf(
+        Avros.strings(),
+        writeRecord);
+
+    PTable<Integer, String> writeCollection = collection.by(new MapFn<String, Integer>()
{
+      @Override
+      public Integer map(String input) {
+        return input.length();
+      }
+    }, Avros.ints());
+
+    writeCollection.write(To.avroFile(avroFile.getAbsolutePath()));
+
+    PCollection<Pair<Integer, String>> readCollection = MemPipeline.getInstance().read(
+        At.avroFile(avroFile.getAbsolutePath(),
+            Avros.tableOf(Avros.ints(), Avros.strings())));
+
+    Map<Integer, String> map = PTables.asPTable(readCollection).asMap().getValue();
+    assertEquals(writeRecord, map.get(writeRecord.length()));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/7da6de5c/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index b3e9c54..42d1ca8 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -203,7 +203,7 @@ public class MemPipeline implements Pipeline {
             writeSequenceFileFromPCollection(fs, outputPath, collection);
           }
         } else {
-          if (target instanceof AvroFileTarget && !(collection instanceof PTable))
{
+          if (target instanceof AvroFileTarget){
             Path outputPath = new Path(path, "out" + outputIndex + ".avro");
             FSDataOutputStream os = fs.create(outputPath);
             writeAvroFile(os, collection);


Mime
View raw message