crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-302: Initialize PType input/output functions when writing data in the MemPipeline.
Date Fri, 22 Nov 2013 02:33:55 GMT
Updated Branches:
  refs/heads/master b9e9672d9 -> 2a8b6c149


CRUNCH-302: Initialize PType input/output functions when writing data in the MemPipeline.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2a8b6c14
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2a8b6c14
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2a8b6c14

Branch: refs/heads/master
Commit: 2a8b6c1498f0a0eff3f595652f1745098beca8d5
Parents: b9e9672
Author: Josh Wills <jwills@apache.org>
Authored: Thu Nov 21 17:40:36 2013 -0800
Committer: Josh Wills <jwills@apache.org>
Committed: Thu Nov 21 17:40:36 2013 -0800

----------------------------------------------------------------------
 .../impl/mem/MemPipelineFileReadingWritingIT.java    | 15 +++++++++++++++
 .../java/org/apache/crunch/impl/mem/MemPipeline.java |  7 ++++---
 2 files changed, 19 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/2a8b6c14/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java
b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java
index bb75681..5cefc2d 100644
--- a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java
@@ -27,7 +27,10 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.commons.io.filefilter.WildcardFileFilter;
@@ -248,6 +251,18 @@ public class MemPipelineFileReadingWritingIT {
 
   }
 
+  @Test
+  public void testMemPipelineWriteAvroFile_Tuples() throws IOException {
+    AvroType<Pair<String, Long>> at = Avros.pairs(Avros.strings(), Avros.longs());
+    Set<Pair<String, Long>> data = ImmutableSet.of(Pair.of("a", 1L), Pair.of("b",
2L), Pair.of("c", 3L));
+        PCollection < Pair < String, Long >> pc = MemPipeline.typedCollectionOf(at,
data);
+    pc.write(To.avroFile(outputDir.getPath()));
+
+    Iterable<Pair<String, Long>> it = MemPipeline.getInstance().read(
+        at.getDefaultFileSource(new Path(outputDir.getPath()))).materialize();
+    assertEquals(data, Sets.newHashSet(it));
+  }
+
   static class SimpleBean {
     public int value;
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/2a8b6c14/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 5e6dfa0..ce411ca 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericContainer;
 import org.apache.avro.io.DatumWriter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -51,7 +50,6 @@ import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.Avros;
-import org.apache.crunch.types.avro.ReflectDataFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -188,6 +186,9 @@ public class MemPipeline implements Pipeline {
     }
     activeTargets.add(target);
     if (target instanceof PathTarget) {
+      if (collection.getPType() != null) {
+        collection.getPType().initialize(getConfiguration());
+      }
       Path path = ((PathTarget) target).getPath();
       try {
         FileSystem fs = path.getFileSystem(conf);
@@ -245,7 +246,7 @@ public class MemPipeline implements Pipeline {
     dataFileWriter.create(avroType.getSchema(), outputStream);
 
     for (Object record : recordCollection.materialize()) {
-      dataFileWriter.append(record);
+      dataFileWriter.append(avroType.getOutputMapFn().map(record));
     }
 
     dataFileWriter.close();


Mime
View raw message