tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-1351. MROutput needs a flush method to ensure data is materialized for FileOutputCommitter (bikas)
Date Mon, 04 Aug 2014 21:56:25 GMT
Repository: tez
Updated Branches:
  refs/heads/master f5213ee46 -> 5ae48c6ef


TEZ-1351. MROutput needs a flush method to ensure data is materialized for FileOutputCommitter
(bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5ae48c6e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5ae48c6e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5ae48c6e

Branch: refs/heads/master
Commit: 5ae48c6ef568d0f56f3a6b6f785579f24ba1f557
Parents: f5213ee
Author: Bikas Saha <bikas@apache.org>
Authored: Mon Aug 4 14:56:15 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Mon Aug 4 14:56:15 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../mapreduce/examples/IntersectExample.java    |  9 ++-----
 .../tez/mapreduce/examples/UnionExample.java    | 11 +--------
 .../processor/FilterByWordOutputProcessor.java  |  4 ----
 .../apache/tez/mapreduce/output/MROutput.java   | 25 +++++++++++++-------
 .../apache/tez/mapreduce/processor/MRTask.java  |  5 +++-
 .../mapreduce/processor/SimpleMRProcessor.java  | 13 ++++++++--
 7 files changed, 37 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/5ae48c6e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4a852e5..072fe55 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -43,6 +43,8 @@ INCOMPATIBLE CHANGES
   TEZ-1041. Use VertexLocationHint consistently everywhere in the API
   TEZ-1057. Replace interfaces with abstract classes for
   Processor/Input/Output classes
+  TEZ-1351. MROutput needs a flush method to ensure data is materialized for
+  FileOutputCommitter
 
 Release 0.4.0-incubating: 2014-04-05
 

http://git-wip-us.apache.org/repos/asf/tez/blob/5ae48c6e/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
index 08a8029..ef7643a 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
@@ -59,6 +59,7 @@ import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.Reader;
@@ -282,7 +283,7 @@ public class IntersectExample extends Configured implements Tool {
     }
   }
 
-  public static class IntersectProcessor extends SimpleProcessor {
+  public static class IntersectProcessor extends SimpleMRProcessor {
 
     public IntersectProcessor(TezProcessorContext context) {
       super(context);
@@ -316,12 +317,6 @@ public class IntersectExample extends Configured implements Tool {
           writer.write(key, NullWritable.get());
         }
       }
-
-      LOG.info("Completed Processing. Trying to commit");
-      while (!getContext().canCommit()) {
-        Thread.sleep(100l);
-      }
-      output.commit();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5ae48c6e/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
index 1fdd1f3..4455bb4 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
@@ -67,14 +67,13 @@ import org.apache.tez.runtime.library.api.KeyValuesReader;
 import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
 import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
-import org.apache.tez.runtime.library.processor.SimpleProcessor;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
 public class UnionExample {
 
-  public static class TokenProcessor extends SimpleProcessor {
+  public static class TokenProcessor extends SimpleMRProcessor {
     IntWritable one = new IntWritable(1);
     Text word = new Text();
 
@@ -111,14 +110,6 @@ public class UnionExample {
           }
         }
       }
-      if (inUnion) {
-        if (parts.isCommitRequired()) {
-          while (!getContext().canCommit()) {
-            Thread.sleep(100);
-          }
-          parts.commit();
-        }
-      }
     }
 
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/5ae48c6e/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
index e277839..c9247f4 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
@@ -44,10 +44,6 @@ public class FilterByWordOutputProcessor extends SimpleMRProcessor {
 
 
   @Override
-  public void initialize() throws Exception {
-  }
-
-  @Override
   public void handleEvents(List<Event> processorEvents) {
     throw new UnsupportedOperationException("Not expecting any events to the broadcast output
processor");
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/5ae48c6e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index ed473e7..bda3f1b 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -60,7 +60,7 @@ public class MROutput extends AbstractLogicalOutput {
   
   private JobConf jobConf;
   boolean useNewApi;
-  private AtomicBoolean closed = new AtomicBoolean(false);
+  private AtomicBoolean flushed = new AtomicBoolean(false);
 
   @SuppressWarnings("rawtypes")
   org.apache.hadoop.mapreduce.OutputFormat newOutputFormat;
@@ -272,11 +272,21 @@ public class MROutput extends AbstractLogicalOutput {
 
   @Override
   public synchronized List<Event> close() throws IOException {
-    if (closed.getAndSet(true)) {
-      return null;
+    flush();
+    return null;
+  }
+  
+  /**
+   * Call this in the processor before finishing to ensure outputs that 
+   * outputs have been flushed. Must be called before commit.
+   * @throws IOException
+   */
+  public void flush() throws IOException {
+    if (flushed.getAndSet(true)) {
+      return;
     }
 
-    LOG.info("Closing Simple Output");
+    LOG.info("Flushing Simple Output");
     if (useNewApi) {
       try {
         newRecordWriter.close(newApiTaskAttemptContext);
@@ -286,8 +296,7 @@ public class MROutput extends AbstractLogicalOutput {
     } else {
       oldRecordWriter.close(null);
     }
-    LOG.info("Closed Simple Output");
-    return null;
+    LOG.info("Flushed Simple Output");
   }
 
   /**
@@ -296,7 +305,7 @@ public class MROutput extends AbstractLogicalOutput {
    * @throws IOException
    */
   public void commit() throws IOException {
-    close();
+    flush();
     if (useNewApi) {
       committer.commitTask(newApiTaskAttemptContext);
     } else {
@@ -311,7 +320,7 @@ public class MROutput extends AbstractLogicalOutput {
    * @throws IOException
    */
   public void abort() throws IOException {
-    close();
+    flush();
     if (useNewApi) {
       committer.abortTask(newApiTaskAttemptContext);
     } else {

http://git-wip-us.apache.org/repos/asf/tez/blob/5ae48c6e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index d867107..05eea2b 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -449,7 +449,10 @@ public abstract class MRTask extends AbstractLogicalIOProcessor {
     // task can Commit now
     try {
       LOG.info("Task " + taskAttemptId + " is allowed to commit now");
-      output.commit();
+      output.flush();
+      if (output.isCommitRequired()) {
+        output.commit();
+      }
       return;
     } catch (IOException iee) {
       LOG.warn("Failure committing: " +

http://git-wip-us.apache.org/repos/asf/tez/blob/5ae48c6e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java
index 23877de..e2a900a 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java
@@ -43,11 +43,20 @@ public abstract class SimpleMRProcessor extends SimpleProcessor {
     }
     List<MROutput> mrOuts = Lists.newLinkedList();
     for (LogicalOutput output : getOutputs().values()) {
-      if ((output instanceof MROutput) && (((MROutput) output).isCommitRequired()))
{
-        mrOuts.add((MROutput) output);
+      if (output instanceof MROutput) {
+        MROutput mrOutput = (MROutput) output;
+        mrOutput.flush();
+        if (mrOutput.isCommitRequired()) {
+          mrOuts.add((MROutput) output);
+        }
       }
     }
     if (mrOuts.size() > 0) {
+      // This will loop till the AM asks for the task to be killed. As
+      // against, the AM sending a signal to the task to kill itself
+      // gracefully. The AM waits for the current committer to successfully
+      // complete and then kills us. Until then we wait in case the
+      // current committer fails and we get chosen to commit.
       while (!getContext().canCommit()) {
         Thread.sleep(100);
       }


Mime
View raw message