crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-300: Allow MemPipeline to write Avro files by reflection and add more tests for writes done from MemPipeline.
Date Fri, 22 Nov 2013 00:44:41 GMT
Updated Branches:
  refs/heads/master fb172fd84 -> b9e9672d9


CRUNCH-300: Allow MemPipeline to write Avro files by reflection and add more tests for writes
done
from MemPipeline.

Signed-off-by: Josh Wills <jwills@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b9e9672d
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b9e9672d
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b9e9672d

Branch: refs/heads/master
Commit: b9e9672d9ced2a48db95028b48407b5b2d2a830b
Parents: fb172fd
Author: David Whiting <davw@spotify.com>
Authored: Wed Nov 20 15:53:21 2013 +0100
Committer: Josh Wills <jwills@apache.org>
Committed: Thu Nov 21 16:39:32 2013 -0800

----------------------------------------------------------------------
 .../java/org/apache/crunch/TermFrequencyIT.java |   4 +-
 .../mem/MemPipelineFileReadingWritingIT.java    | 152 +++++++++++++++----
 .../crunch/io/avro/AvroMemPipelineIT.java       |  58 +++----
 .../org/apache/crunch/impl/mem/MemPipeline.java |  59 ++++---
 4 files changed, 183 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/b9e9672d/crunch-core/src/it/java/org/apache/crunch/TermFrequencyIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/TermFrequencyIT.java b/crunch-core/src/it/java/org/apache/crunch/TermFrequencyIT.java
index 2abdb8c..7ede881 100644
--- a/crunch-core/src/it/java/org/apache/crunch/TermFrequencyIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/TermFrequencyIT.java
@@ -74,7 +74,7 @@ public class TermFrequencyIT implements Serializable {
 
     /*
      * Input: String Input title text
-     * 
+     *
      * Output: PTable<Pair<String, String>, Long> Pair<Pair<word, title>,
count
      * in title>
      */
@@ -98,7 +98,7 @@ public class TermFrequencyIT implements Serializable {
       /*
        * Input: Pair<Pair<String, String>, Long> Pair<Pair<word, title>,
count
        * in title>
-       * 
+       *
        * Output: PTable<String, Pair<String, Long>> PTable<word, Pair<title,
        * count in title>>
        */

http://git-wip-us.apache.org/repos/asf/crunch/blob/b9e9672d/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java
b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java
index 2d66672..bb75681 100644
--- a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java
@@ -18,23 +18,32 @@
 package org.apache.crunch.impl.mem;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 
 import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.Target;
-import org.apache.crunch.impl.mem.collect.MemCollection;
 import org.apache.crunch.impl.mem.collect.MemTable;
 import org.apache.crunch.io.From;
 import org.apache.crunch.io.To;
+import org.apache.crunch.io.avro.AvroFileReaderFactory;
+import org.apache.crunch.test.Person;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -50,53 +59,57 @@ import org.junit.Test;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 
 public class MemPipelineFileReadingWritingIT {
   @Rule
   public TemporaryPath baseTmpDir = TemporaryPaths.create();
-  
+
   private File inputFile;
-  private File outputFile;
-  
-  
+  private File outputDir;
+
+
   private static final Collection<String> EXPECTED_COLLECTION = Lists.newArrayList("hello",
"world");
   @SuppressWarnings("unchecked")
   private static final Collection<Pair<Integer, String>> EXPECTED_TABLE = Lists.newArrayList(
-                                                        Pair.of(1, "hello"), 
+                                                        Pair.of(1, "hello"),
                                                         Pair.of(2, "world"));
-  
+
 
   @Before
   public void setUp() throws IOException {
-    inputFile = baseTmpDir.getFile("test-read.seq");
-    outputFile = baseTmpDir.getFile("test-write.seq");
+    inputFile = baseTmpDir.getFile("test-read");
+    outputDir = baseTmpDir.getFile("test-write");
+  }
+
+  private File getOutputFile(File outputDir, String wildcardFilter) {
+
+    File[] files = outputDir.listFiles((FilenameFilter)new WildcardFileFilter(wildcardFilter));
+    System.out.println(Arrays.asList(files));
+    assertEquals(1, files.length);
+    return files[0];
   }
 
   @Test
   public void testMemPipelineFileWriter() throws Exception {
-    File tmpDir = baseTmpDir.getFile("mempipe");
+    File outputDir = baseTmpDir.getFile("mempipe");
     Pipeline p = MemPipeline.getInstance();
     PCollection<String> lines = MemPipeline.collectionOf("hello", "world");
-    p.writeTextFile(lines, tmpDir.toString());
+    p.writeTextFile(lines, outputDir.toString());
     p.done();
-    assertTrue(tmpDir.exists());
-    File[] files = tmpDir.listFiles();
-    assertTrue(files != null && files.length > 0);
-    for (File f : files) {
-      if (!f.getName().startsWith(".")) {
-        List<String> txt = Files.readLines(f, Charsets.UTF_8);
-        assertEquals(ImmutableList.of("hello", "world"), txt);
-      }
-    }
+    File outputFile = getOutputFile(outputDir, "*.txt");
+
+    List<String> txt = Files.readLines(outputFile, Charsets.UTF_8);
+    assertEquals(ImmutableList.of("hello", "world"), txt);
   }
 
   private void createTestSequenceFile(final File seqFile) throws IOException {
     SequenceFile.Writer writer = null;
     writer = new Writer(FileSystem.getLocal(baseTmpDir.getDefaultConfiguration()),
-              baseTmpDir.getDefaultConfiguration(), 
-              new Path(seqFile.toString()), 
+              baseTmpDir.getDefaultConfiguration(),
+              new Path(seqFile.toString()),
               IntWritable.class, Text.class);
     writer.append(new IntWritable(1), new Text("hello"));
     writer.append(new IntWritable(2), new Text("world"));
@@ -110,9 +123,9 @@ public class MemPipelineFileReadingWritingIT {
 
     // read from sequence file
     final PCollection<Pair<Integer, String>> readCollection = MemPipeline.getInstance().read(
-      From.sequenceFile(inputFile.toString(), 
+      From.sequenceFile(inputFile.toString(),
         Writables.tableOf(
-          Writables.ints(), 
+          Writables.ints(),
           Writables.strings())));
 
     // assert read same as written.
@@ -123,12 +136,13 @@ public class MemPipelineFileReadingWritingIT {
   public void testMemPipelineWriteSequenceFile_PCollection() throws IOException {
     // write
     PCollection<String> collection = MemPipeline.typedCollectionOf(Writables.strings(),
EXPECTED_COLLECTION);
-    final Target target = To.sequenceFile(outputFile.toString());
+    final Target target = To.sequenceFile(outputDir.toString());
     MemPipeline.getInstance().write(collection, target);
 
     // read
     final SequenceFile.Reader reader = new Reader(FileSystem.getLocal(
-      baseTmpDir.getDefaultConfiguration()), new Path(outputFile.toString()),
+      baseTmpDir.getDefaultConfiguration()),
+        new Path(getOutputFile(outputDir, "*.seq").toString()),
         baseTmpDir.getDefaultConfiguration());
     final List<String> actual = Lists.newArrayList();
     final NullWritable key = NullWritable.get();
@@ -147,14 +161,14 @@ public class MemPipelineFileReadingWritingIT {
     // write
     final MemTable<Integer, String> collection = new MemTable<Integer, String>(EXPECTED_TABLE,
//
         Writables.tableOf(
-          Writables.ints(), 
+          Writables.ints(),
           Writables.strings()), "test input");
-    final Target target = To.sequenceFile(outputFile.toString());
+    final Target target = To.sequenceFile(outputDir.toString());
     MemPipeline.getInstance().write(collection, target);
 
     // read
     final SequenceFile.Reader reader = new Reader(FileSystem.getLocal(baseTmpDir
-        .getDefaultConfiguration()), new Path(outputFile.toString()),
+        .getDefaultConfiguration()), new Path(getOutputFile(outputDir, "*.seq").toString()),
         baseTmpDir.getDefaultConfiguration());
     final List<Pair<Integer, String>> actual = Lists.newArrayList();
     final IntWritable key = new IntWritable();
@@ -167,4 +181,82 @@ public class MemPipelineFileReadingWritingIT {
     // assert read same as written
     assertEquals(EXPECTED_TABLE, actual);
   }
+
+  @Test
+  public void testMemPipelineWriteAvroFile_SpecificRecords() throws IOException {
+    AvroType<Person> ptype = Avros.specifics(Person.class);
+    PCollection<Person> collection = MemPipeline.typedCollectionOf(
+                                            ptype,
+                                            Person.newBuilder()
+                                              .setName("A")
+                                              .setAge(1)
+                                              .setSiblingnames(ImmutableList.<CharSequence>of())
+                                              .build(),
+                                            Person.newBuilder()
+                                              .setName("B")
+                                              .setAge(2)
+                                              .setSiblingnames(ImmutableList.<CharSequence>of())
+                                              .build());
+
+    MemPipeline.getInstance().write(collection, To.avroFile(outputDir.getPath()));
+
+    Iterator<Person> itr = new AvroFileReaderFactory<Person>(ptype).read(
+              FileSystem.getLocal(baseTmpDir.getDefaultConfiguration()),
+              new Path(getOutputFile(outputDir, "*.avro").getPath()));
+
+    assertEquals(2, Iterators.size(itr));
+
+  }
+
+  @Test
+  public void testMemPipelineWriteAvroFile_ReflectRecords() throws IOException {
+    AvroType<SimpleBean> ptype = Avros.reflects(SimpleBean.class);
+    PCollection<SimpleBean> collection = MemPipeline.typedCollectionOf(
+                                            ptype,
+                                            new SimpleBean(1),
+                                            new SimpleBean(2));
+
+    MemPipeline.getInstance().write(collection, To.avroFile(outputDir.getPath()));
+
+    Iterator<SimpleBean> itr = new AvroFileReaderFactory<SimpleBean>(ptype).read(
+              FileSystem.getLocal(baseTmpDir.getDefaultConfiguration()),
+              new Path(getOutputFile(outputDir, "*.avro").getPath()));
+
+    assertEquals(2, Iterators.size(itr));
+
+  }
+
+  @Test
+  public void testMemPipelineWriteAvroFile_GenericRecords() throws IOException {
+    AvroType<GenericData.Record> ptype = Avros.generics(Person.SCHEMA$);
+    GenericData.Record record = new GenericRecordBuilder(ptype.getSchema())
+                                  .set("name", "A")
+                                  .set("age", 1)
+                                  .set("siblingnames", ImmutableList.of())
+                                  .build();
+    PCollection<GenericData.Record> collection = MemPipeline.typedCollectionOf(
+                                            ptype, record);
+
+    MemPipeline.getInstance().write(collection, To.avroFile(outputDir.getPath()));
+
+    Iterator<GenericData.Record> itr = new AvroFileReaderFactory<GenericData.Record>(ptype).read(
+              FileSystem.getLocal(baseTmpDir.getDefaultConfiguration()),
+              new Path(getOutputFile(outputDir, "*.avro").getPath()));
+
+    assertEquals(record, itr.next());
+    assertFalse(itr.hasNext());
+
+  }
+
+  static class SimpleBean {
+    public int value;
+
+    public SimpleBean() {
+      this(0);
+    }
+
+    public SimpleBean(int value) {
+      this.value = value;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/b9e9672d/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
index cfb669e..40224e7 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
@@ -5,9 +5,9 @@
  * licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -18,14 +18,13 @@ package org.apache.crunch.io.avro;
 
 import static org.junit.Assert.assertEquals;
 
-import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
-
 import java.util.Set;
+
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericRecord;
@@ -36,6 +35,7 @@ import org.apache.crunch.io.To;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PType;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.hadoop.fs.Path;
 import org.junit.Before;
@@ -43,6 +43,7 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 public class AvroMemPipelineIT implements Serializable {
 
@@ -56,11 +57,13 @@ public class AvroMemPipelineIT implements Serializable {
   }
 
   @Test
-  public void testMemPipelienWithSpecificRecord() {
+  public void testMemPipelineWithSpecificRecord() {
 
     Person writeRecord = createSpecificRecord();
 
-    final PCollection<Person> writeCollection = MemPipeline.collectionOf(Collections.singleton(writeRecord));
+    final PCollection<Person> writeCollection = MemPipeline.typedCollectionOf(
+                                                  Avros.specifics(Person.class),
+                                                  writeRecord);
 
     writeCollection.write(To.avroFile(avroFile.getAbsolutePath()));
 
@@ -78,11 +81,15 @@ public class AvroMemPipelineIT implements Serializable {
   }
 
   @Test
-  public void testMemPipelienWithGenericRecord() {
+  public void testMemPipelineWithGenericRecord() {
+
+    PType<GenericData.Record> ptype = Avros.generics(Person.SCHEMA$);
 
-    GenericRecord writeRecord = createGenericRecord();
+    GenericData.Record writeRecord = createGenericRecord("John Doe");
 
-    final PCollection<GenericRecord> writeCollection = MemPipeline.collectionOf(Collections.singleton(writeRecord));
+    final PCollection<GenericData.Record> writeCollection = MemPipeline.typedCollectionOf(
+                                                            ptype,
+                                                            writeRecord);
 
     writeCollection.write(To.avroFile(avroFile.getAbsolutePath()));
 
@@ -94,22 +101,14 @@ public class AvroMemPipelineIT implements Serializable {
     assertEquals(writeRecord, readRecord);
   }
 
-  private GenericRecord createGenericRecord() {
-
-    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
-    savedRecord.put("name", "John Doe");
-    savedRecord.put("age", 42);
-    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
-
-    return savedRecord;
-  }
-
   @Test
-  public void testMemPipelienWithReflectionRecord() {
+  public void testMemPipelineWithReflectionRecord() {
 
     String writeRecord = "John Doe";
 
-    final PCollection<String> writeCollection = MemPipeline.collectionOf(Collections.singleton(writeRecord));
+    final PCollection<String> writeCollection = MemPipeline.typedCollectionOf(
+                                                          Avros.strings(),
+                                                          writeRecord);
 
     writeCollection.write(To.avroFile(avroFile.getAbsolutePath()));
 
@@ -124,13 +123,18 @@ public class AvroMemPipelineIT implements Serializable {
   @Test
   public void testMemPipelineWithMultiplePaths() {
 
-    GenericRecord writeRecord1 = createGenericRecord("John Doe");
-    final PCollection<GenericRecord> writeCollection1 = MemPipeline.collectionOf(Collections.singleton(writeRecord1));
+    PType<GenericData.Record> ptype = Avros.generics(Person.SCHEMA$);
+    GenericData.Record writeRecord1 = createGenericRecord("John Doe");
+    final PCollection<GenericData.Record> writeCollection1 = MemPipeline.typedCollectionOf(
+      ptype,
+                                                                      writeRecord1);
     writeCollection1.write(To.avroFile(avroFile.getAbsolutePath()));
 
     File avroFile2 = tmpDir.getFile("test2.avro");
-    GenericRecord writeRecord2 = createGenericRecord("Jane Doe");
-    final PCollection<GenericRecord> writeCollection2 = MemPipeline.collectionOf(Collections.singleton(writeRecord2));
+    GenericData.Record writeRecord2 = createGenericRecord("Jane Doe");
+    final PCollection<GenericData.Record> writeCollection2 = MemPipeline.typedCollectionOf(
+                                                                    ptype,
+                                                                    writeRecord2);
     writeCollection2.write(To.avroFile(avroFile2.getAbsolutePath()));
 
     List<Path> paths = Lists.newArrayList(new Path(avroFile.getAbsolutePath()),
@@ -143,9 +147,9 @@ public class AvroMemPipelineIT implements Serializable {
     assertEquals(Sets.newHashSet(writeRecord1, writeRecord2), readSet);
   }
 
-  private GenericRecord createGenericRecord(String name) {
+  private GenericData.Record createGenericRecord(String name) {
 
-    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    GenericData.Record savedRecord = new GenericData.Record(Person.SCHEMA$);
     savedRecord.put("name", name);
     savedRecord.put("age", 42);
     savedRecord.put("siblingnames", Lists.newArrayList("Jimmy"));

http://git-wip-us.apache.org/repos/asf/crunch/blob/b9e9672d/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 362763b..5e6dfa0 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -24,8 +24,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import com.google.common.util.concurrent.AbstractFuture;
-import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericContainer;
 import org.apache.avro.io.DatumWriter;
@@ -51,6 +49,7 @@ import org.apache.crunch.io.seq.SeqFileTarget;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.avro.ReflectDataFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -64,6 +63,7 @@ import org.apache.hadoop.mapreduce.Counters;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractFuture;
 
 public class MemPipeline implements Pipeline {
 
@@ -72,11 +72,11 @@ public class MemPipeline implements Pipeline {
   private static final MemPipeline INSTANCE = new MemPipeline();
 
   private int outputIndex = 0;
-  
+
   public static Counters getCounters() {
     return COUNTERS;
   }
-  
+
   public static void clearCounters() {
     COUNTERS = new CountersWrapper();
   }
@@ -129,7 +129,7 @@ public class MemPipeline implements Pipeline {
 
   private Configuration conf = new Configuration();
   private Set<Target> activeTargets = Sets.newHashSet();
-  
+
   private MemPipeline() {
   }
 
@@ -177,7 +177,7 @@ public class MemPipeline implements Pipeline {
   public void write(PCollection<?> collection, Target target) {
     write(collection, target, Target.WriteMode.DEFAULT);
   }
-  
+
   @Override
   public void write(PCollection<?> collection, Target target, Target.WriteMode writeMode)
{
     target.handleExisting(writeMode, -1, getConfiguration());
@@ -193,18 +193,22 @@ public class MemPipeline implements Pipeline {
         FileSystem fs = path.getFileSystem(conf);
         outputIndex++;
         if (target instanceof SeqFileTarget) {
+          Path outputPath = new Path(path, "out" + outputIndex + ".seq");
           if (collection instanceof PTable) {
-            writeSequenceFileFromPTable(fs, path, (PTable<?, ?>) collection);
+            writeSequenceFileFromPTable(fs, outputPath, (PTable<?, ?>) collection);
           } else {
-            writeSequenceFileFromPCollection(fs, path, collection);
+            writeSequenceFileFromPCollection(fs, outputPath, collection);
           }
         } else {
-          FSDataOutputStream os = fs.create(new Path(path, "out" + outputIndex));
           if (target instanceof AvroFileTarget && !(collection instanceof PTable))
{
-
-            writeAvroFile(os, collection.materialize());
+            Path outputPath = new Path(path, "out" + outputIndex + ".avro");
+            FSDataOutputStream os = fs.create(outputPath);
+            writeAvroFile(os, collection);
+            os.close();
           } else {
             LOG.warn("Defaulting to write to a text file from MemPipeline");
+            Path outputPath = new Path(path, "out" + outputIndex + ".txt");
+            FSDataOutputStream os = fs.create(outputPath);
             if (collection instanceof PTable) {
               for (Object o : collection.materialize()) {
                 Pair p = (Pair) o;
@@ -218,8 +222,8 @@ public class MemPipeline implements Pipeline {
                 os.writeBytes(o.toString() + "\r\n");
               }
             }
+            os.close();
           }
-          os.close();
         }
       } catch (IOException e) {
         LOG.error("Exception writing target: " + target, e);
@@ -230,31 +234,24 @@ public class MemPipeline implements Pipeline {
   }
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
-  private void writeAvroFile(FSDataOutputStream outputStream, Iterable genericRecords) throws
IOException {
-    
-    Object r = genericRecords.iterator().next();
-    
-    Schema schema = null;
-    
-    if (r instanceof GenericContainer) {
-      schema = ((GenericContainer) r).getSchema();
-    } else {
-      schema = new ReflectDataFactory().getReflectData().getSchema(r.getClass());
-    }
-
-    DatumWriter datumWriter = Avros.newWriter(schema);
+  private void writeAvroFile(FSDataOutputStream outputStream, PCollection recordCollection)
throws IOException {
 
+    AvroType avroType = (AvroType)recordCollection.getPType();
+    if (avroType == null) {
+      throw new IllegalStateException("Can't write a non-typed Avro collection");
+    }
+    DatumWriter datumWriter = Avros.newWriter((AvroType)recordCollection.getPType());
     DataFileWriter dataFileWriter = new DataFileWriter(datumWriter);
-    dataFileWriter.create(schema, outputStream);
+    dataFileWriter.create(avroType.getSchema(), outputStream);
 
-    for (Object record : genericRecords) {
+    for (Object record : recordCollection.materialize()) {
       dataFileWriter.append(record);
     }
 
     dataFileWriter.close();
     outputStream.close();
   }
-  
+
   @SuppressWarnings({ "rawtypes", "unchecked" })
   private void writeSequenceFileFromPTable(final FileSystem fs, final Path path, final PTable
table)
       throws IOException {
@@ -274,7 +271,7 @@ public class MemPipeline implements Pipeline {
 
     writer.close();
   }
-  
+
   private void writeSequenceFileFromPCollection(final FileSystem fs, final Path path,
       final PCollection collection) throws IOException {
     final PType pType = collection.getPType();
@@ -291,7 +288,7 @@ public class MemPipeline implements Pipeline {
 
     writer.close();
   }
-   
+
   @Override
   public PCollection<String> readTextFile(String pathName) {
     return read(At.textFile(pathName));
@@ -312,7 +309,7 @@ public class MemPipeline implements Pipeline {
     activeTargets.clear();
     return new MemExecution();
   }
-  
+
   @Override
   public PipelineResult run() {
     try {


Mime
View raw message