crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-479: DProperly handle materialization of APPEND-style PCollections
Date Tue, 28 Oct 2014 11:42:40 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 82cecc0ce -> 253326148


CRUNCH-479: DProperly handle materialization of APPEND-style PCollections


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/25332614
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/25332614
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/25332614

Branch: refs/heads/master
Commit: 25332614830ca61df05d2f68074f4462b8bdf1d4
Parents: 82cecc0
Author: Josh Wills <jwills@apache.org>
Authored: Mon Oct 27 13:43:28 2014 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Tue Oct 28 04:39:44 2014 -0700

----------------------------------------------------------------------
 .../crunch/io/avro/AvroFileSourceTargetIT.java  | 70 +++++++++++++++++---
 .../crunch/impl/dist/DistributedPipeline.java   | 10 ++-
 .../org/apache/crunch/impl/mr/MRPipeline.java   |  2 +-
 .../apache/crunch/impl/mr/exec/MRExecutor.java  |  5 +-
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java |  7 +-
 5 files changed, 79 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/25332614/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
index 9f51f23..511e827 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
@@ -34,9 +34,11 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.Pipeline;
+import org.apache.crunch.Target;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.At;
 import org.apache.crunch.io.From;
+import org.apache.crunch.io.To;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.test.StringWrapper;
 import org.apache.crunch.test.TemporaryPath;
@@ -58,11 +60,15 @@ public class AvroFileSourceTargetIT implements Serializable {
 
   @Before
   public void setUp() throws IOException {
-    avroFile = tmpDir.getFile("test.avro");
+    avroFile = getTmpFile("test.avro");
   }
 
-  private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema)
throws IOException {
-    FileOutputStream outputStream = new FileOutputStream(this.avroFile);
+  private File getTmpFile(String file){
+      return tmpDir.getFile(file);
+  }
+
+  private void populateGenericFile(File outFile, List<GenericRecord> genericRecords,
Schema schema) throws IOException {
+    FileOutputStream outputStream = new FileOutputStream(outFile);
     GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(schema);
 
     DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter);
@@ -83,7 +89,7 @@ public class AvroFileSourceTargetIT implements Serializable {
     savedRecord.put("name", "John Doe");
     savedRecord.put("age", 42);
     savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
-    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+    populateGenericFile(avroFile, Lists.newArrayList(savedRecord), Person.SCHEMA$);
 
     Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
     PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
@@ -92,15 +98,59 @@ public class AvroFileSourceTargetIT implements Serializable {
     List<Person> personList = Lists.newArrayList(genericCollection.materialize());
 
     Person expectedPerson = new Person();
-    expectedPerson.name = "John Doe";
-    expectedPerson.age = 42;
+    expectedPerson.setName("John Doe");
+    expectedPerson.setAge(42);
 
     List<CharSequence> siblingNames = Lists.newArrayList();
     siblingNames.add("Jimmy");
     siblingNames.add("Jane");
-    expectedPerson.siblingnames = siblingNames;
+    expectedPerson.setSiblingnames(siblingNames);
+
+    assertEquals(Lists.newArrayList(expectedPerson), Lists.newArrayList(personList));
+  }
+
+  @Test
+  public void testMaterializeAppendMode() throws IOException {
+    File parentPath = getTmpFile("existing");
+    parentPath.mkdir();
+    File existingRecordsFile = new File(parentPath, "test.avro");
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(existingRecordsFile, Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    GenericRecord secondRecord = new GenericData.Record(Person.SCHEMA$);
+    secondRecord.put("name", "Admiral Ackbar");
+    secondRecord.put("age", 37);
+    secondRecord.put("siblingnames", Lists.newArrayList("Itsa", "Trap"));
+
+    File newRecordsParent = getTmpFile("new");
+    newRecordsParent.mkdir();
+    File newRecordsFile = new File(newRecordsParent, "test.avro");
+    populateGenericFile(newRecordsFile, Lists.newArrayList(secondRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Person> people = pipeline.read(At.avroFile(newRecordsParent.getAbsolutePath(),
+            Avros.records(Person.class)));
+
+    pipeline.write(people, To.avroFile(parentPath.getAbsolutePath()), Target.WriteMode.APPEND);
+    pipeline.run();
+
+    List<Person> personList = Lists.newArrayList(people.materialize());
+
+    Person expectedPerson = new Person();
+    expectedPerson.setName("Admiral Ackbar");
+    expectedPerson.setAge(37);
+
+    List<CharSequence> siblingNames = Lists.newArrayList();
+    siblingNames.add("Itsa");
+    siblingNames.add("Trap");
+    expectedPerson.setSiblingnames(siblingNames);
 
     assertEquals(Lists.newArrayList(expectedPerson), Lists.newArrayList(personList));
+
+    pipeline.done();
   }
 
   @Test
@@ -109,7 +159,7 @@ public class AvroFileSourceTargetIT implements Serializable {
     savedRecord.put("name", "John Doe");
     savedRecord.put("age", 42);
     savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
-    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+    populateGenericFile(avroFile, Lists.newArrayList(savedRecord), Person.SCHEMA$);
 
     Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
     PCollection<GenericData.Record> genericCollection = pipeline.read(From.avroFile(
@@ -129,7 +179,7 @@ public class AvroFileSourceTargetIT implements Serializable {
     savedRecord.put("name", "John Doe");
     savedRecord.put("age", 42);
     savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
-    populateGenericFile(Lists.newArrayList(savedRecord), genericPersonSchema);
+    populateGenericFile(avroFile, Lists.newArrayList(savedRecord), genericPersonSchema);
 
     Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
     PCollection<Record> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
@@ -145,7 +195,7 @@ public class AvroFileSourceTargetIT implements Serializable {
     Schema pojoPersonSchema = ReflectData.get().getSchema(StringWrapper.class);
     GenericRecord savedRecord = new GenericData.Record(pojoPersonSchema);
     savedRecord.put("value", "stringvalue");
-    populateGenericFile(Lists.newArrayList(savedRecord), pojoPersonSchema);
+    populateGenericFile(avroFile, Lists.newArrayList(savedRecord), pojoPersonSchema);
 
     Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
     PCollection<StringWrapper> stringValueCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),

http://git-wip-us.apache.org/repos/asf/crunch/blob/25332614/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
index 2d236ae..64510f4 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
@@ -70,6 +70,7 @@ public abstract class DistributedPipeline implements Pipeline {
   protected final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
   protected final Map<PCollectionImpl<?>, MaterializableIterable<?>> outputTargetsToMaterialize;
   protected final Map<PipelineCallable<?>, Set<Target>> allPipelineCallables;
+  protected final Set<Target> appendedTargets;
   private Path tempDirectory;
   private int tempFileIndex;
   private int nextAnonymousStageId;
@@ -89,6 +90,7 @@ public abstract class DistributedPipeline implements Pipeline {
     this.outputTargets = Maps.newHashMap();
     this.outputTargetsToMaterialize = Maps.newHashMap();
     this.allPipelineCallables = Maps.newHashMap();
+    this.appendedTargets = Sets.newHashSet();
     this.conf = conf;
     this.tempDirectory = createTempDirectory(conf);
     this.tempFileIndex = 0;
@@ -191,6 +193,12 @@ public abstract class DistributedPipeline implements Pipeline {
       throw new CrunchRuntimeException("Target " + target + " is already written in current
run." +
           " Use WriteMode.APPEND in order to write additional data to it.");
     }
+
+    // Need special handling for append targets in the case of materialization
+    if (writeMode == Target.WriteMode.APPEND) {
+      appendedTargets.add(target);
+    }
+
     addOutput((PCollectionImpl<?>) pcollection, target);
   }
 
@@ -264,7 +272,7 @@ public abstract class DistributedPipeline implements Pipeline {
     ReadableSourceTarget<T> srcTarget = null;
     if (outputTargets.containsKey(pcollection)) {
       for (Target target : outputTargets.get(impl)) {
-        if (target instanceof ReadableSourceTarget) {
+        if (target instanceof ReadableSourceTarget && !appendedTargets.contains(target))
{
           return (ReadableSourceTarget<T>) target;
         }
       }

http://git-wip-us.apache.org/repos/asf/crunch/blob/25332614/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index 1c48c62..d23988b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -106,7 +106,7 @@ public class MRPipeline extends DistributedPipeline {
         outputTargetsToMaterialize.remove(c);
       }
     }
-    MSCRPlanner planner = new MSCRPlanner(this, outputTargets, toMaterialize, allPipelineCallables);
+    MSCRPlanner planner = new MSCRPlanner(this, outputTargets, toMaterialize, appendedTargets,
allPipelineCallables);
     try {
       return planner.plan(jarClass, getConfiguration());
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/25332614/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index 2cefc04..024fcce 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -59,6 +59,7 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements
MRPipe
   private final CrunchJobControl control;
   private final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
   private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
+  private final Set<Target> appendedTargets;
   private final CountDownLatch doneSignal = new CountDownLatch(1);
   private final CountDownLatch killSignal = new CountDownLatch(1);
   private final CappedExponentialCounter pollInterval;
@@ -74,10 +75,12 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements
MRPipe
       Class<?> jarClass,
       Map<PCollectionImpl<?>, Set<Target>> outputTargets,
       Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize,
+      Set<Target> appendedTargets,
       Map<PipelineCallable<?>, Set<Target>> pipelineCallables) {
     this.control = new CrunchJobControl(conf, jarClass.toString(), pipelineCallables);
     this.outputTargets = outputTargets;
     this.toMaterialize = toMaterialize;
+    this.appendedTargets = appendedTargets;
     this.monitorThread = new Thread(new Runnable() {
       @Override
       public void run() {
@@ -147,7 +150,7 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements
MRPipe
           } else {
             boolean materialized = false;
             for (Target t : outputTargets.get(c)) {
-              if (!materialized) {
+              if (!materialized && !appendedTargets.contains(t)) {
                 if (t instanceof SourceTarget) {
                   c.materializeAt((SourceTarget) t);
                   materialized = true;

http://git-wip-us.apache.org/repos/asf/crunch/blob/25332614/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index 70acb59..91e3036 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -53,16 +53,19 @@ public class MSCRPlanner {
   private final MRPipeline pipeline;
   private final Map<PCollectionImpl<?>, Set<Target>> outputs;
   private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
+  private final Set<Target> appendedTargets;
   private final Map<PipelineCallable<?>, Set<Target>> pipelineCallables;
   private int lastJobID = 0;
 
   public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>>
outputs,
       Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize,
+      Set<Target> appendedTargets,
       Map<PipelineCallable<?>, Set<Target>> pipelineCallables) {
     this.pipeline = pipeline;
     this.outputs = new TreeMap<PCollectionImpl<?>, Set<Target>>(DEPTH_COMPARATOR);
     this.outputs.putAll(outputs);
     this.toMaterialize = toMaterialize;
+    this.appendedTargets = appendedTargets;
     this.pipelineCallables = pipelineCallables;
   }
 
@@ -117,7 +120,7 @@ public class MSCRPlanner {
       }
       if (!hasInputs) {
         LOG.warn("No input sources for pipeline, nothing to do...");
-        return new MRExecutor(conf, jarClass, outputs, toMaterialize, pipelineCallables);
+        return new MRExecutor(conf, jarClass, outputs, toMaterialize, appendedTargets, pipelineCallables);
       }
 
       // Create a new graph that splits up up dependent GBK nodes.
@@ -191,7 +194,7 @@ public class MSCRPlanner {
     
     // Finally, construct the jobs from the prototypes and return.
     DotfileWriter dotfileWriter = new DotfileWriter();
-    MRExecutor exec = new MRExecutor(conf, jarClass, outputs, toMaterialize, pipelineCallables);
+    MRExecutor exec = new MRExecutor(conf, jarClass, outputs, toMaterialize, appendedTargets,
pipelineCallables);
     for (JobPrototype proto : Sets.newHashSet(assignments.values())) {
       dotfileWriter.addJobPrototype(proto);
       exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline, lastJobID));


Mime
View raw message