crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzo...@apache.org
Subject [1/3] git commit: Extend MemPipline.write to support Avro types (CRUNCH-204)
Date Fri, 10 May 2013 06:22:45 GMT
Updated Branches:
  refs/heads/master d864f2fd4 -> 70da18c54


Extend MemPipline.write to support Avro types (CRUNCH-204)


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2276ee05
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2276ee05
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2276ee05

Branch: refs/heads/master
Commit: 2276ee050d2a22ef74c75d462fa7ddf22a38a67b
Parents: d864f2f
Author: tzolov <tzolov@apache.org>
Authored: Thu May 9 20:35:42 2013 +0200
Committer: tzolov <tzolov@apache.org>
Committed: Thu May 9 20:35:42 2013 +0200

----------------------------------------------------------------------
 .../crunch/impl/mr/collect/UnionCollectionIT.java  |    2 +-
 .../org/apache/crunch/impl/mem/MemPipeline.java    |   36 ++++++++++++++-
 2 files changed, 36 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/2276ee05/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
b/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
index f9f73b2..2832437 100644
--- a/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
@@ -142,7 +142,7 @@ public class UnionCollectionIT {
 
   private void checkFileContents(String filePath) throws IOException {
 
-    List<String> fileContentValues = (typeFamily != AvroTypeFamily.getInstance() ||
!(pipeline instanceof MRPipeline)) ? Lists
+    List<String> fileContentValues = (typeFamily != AvroTypeFamily.getInstance())?
Lists
         .newArrayList(pipeline.read(At.textFile(filePath, typeFamily.strings())).materialize().iterator())
: Lists
         .newArrayList(pipeline.read(At.avroFile(filePath, Avros.strings())).materialize().iterator());
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/2276ee05/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 272b2af..80b0543 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -22,6 +22,10 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.CrunchRuntimeException;
@@ -40,8 +44,10 @@ import org.apache.crunch.impl.mem.collect.MemTable;
 import org.apache.crunch.io.At;
 import org.apache.crunch.io.PathTarget;
 import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.avro.AvroFileTarget;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.ReflectDataFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -180,7 +186,9 @@ public class MemPipeline implements Pipeline {
         FileSystem fs = path.getFileSystem(conf);
         FSDataOutputStream os = fs.create(new Path(path, "out" + outputIndex));
         outputIndex++;
-        if (collection instanceof PTable) {
+        if (target instanceof AvroFileTarget) {
+          writeAvroFile(os, collection.materialize());
+        } else if (collection instanceof PTable) {
           for (Object o : collection.materialize()) {
             Pair p = (Pair) o;
             os.writeBytes(p.first().toString());
@@ -202,6 +210,32 @@ public class MemPipeline implements Pipeline {
     }
   }
 
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  private void writeAvroFile(FSDataOutputStream outputStream, Iterable genericRecords) throws
IOException {
+    
+    Object r = genericRecords.iterator().next();
+    
+    Schema schema = null;
+    
+    if (r instanceof GenericContainer) {
+      schema = ((GenericContainer) r).getSchema();
+    } else {
+      schema = new ReflectDataFactory().getReflectData().getSchema(r.getClass());
+    }
+
+    GenericDatumWriter genericDatumWriter = new GenericDatumWriter(schema);
+
+    DataFileWriter dataFileWriter = new DataFileWriter(genericDatumWriter);
+    dataFileWriter.create(schema, outputStream);
+
+    for (Object record : genericRecords) {
+      dataFileWriter.append(record);
+    }
+
+    dataFileWriter.close();
+    outputStream.close();
+  }
+
   @Override
   public PCollection<String> readTextFile(String pathName) {
     return read(At.textFile(pathName));


Mime
View raw message