crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject crunch git commit: CRUNCH-481. Support independent output committers for multiple outputs.
Date Fri, 13 Feb 2015 14:50:46 GMT
Repository: crunch
Updated Branches:
  refs/heads/master dbd56e638 -> fcf901cbb


CRUNCH-481. Support independent output committers for multiple outputs.

Re-added after this was inadvertantly dropped, and updated to fix null
job ID.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/fcf901cb
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/fcf901cb
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/fcf901cb

Branch: refs/heads/master
Commit: fcf901cbb437faf65cae6d1fb4c3513084fe4186
Parents: dbd56e6
Author: Tom White <tomwhite@apache.org>
Authored: Fri Feb 13 14:48:38 2015 +0000
Committer: Tom White <tomwhite@apache.org>
Committed: Fri Feb 13 14:48:38 2015 +0000

----------------------------------------------------------------------
 .../crunch/impl/mr/plan/JobPrototype.java       |   2 +
 .../crunch/impl/mr/run/CrunchOutputFormat.java  |  54 ++++
 .../org/apache/crunch/io/CrunchOutputs.java     | 292 ++++++++++++++++---
 3 files changed, 306 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/fcf901cb/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index d341184..2863e00 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -34,6 +34,7 @@ import org.apache.crunch.impl.mr.exec.CrunchJobHooks;
 import org.apache.crunch.impl.mr.run.CrunchCombiner;
 import org.apache.crunch.impl.mr.run.CrunchInputFormat;
 import org.apache.crunch.impl.mr.run.CrunchMapper;
+import org.apache.crunch.impl.mr.run.CrunchOutputFormat;
 import org.apache.crunch.impl.mr.run.CrunchReducer;
 import org.apache.crunch.impl.mr.run.NodeContext;
 import org.apache.crunch.impl.mr.run.RTNode;
@@ -214,6 +215,7 @@ class JobPrototype {
       job.setNumReduceTasks(0);
       inputNodes = Lists.newArrayList(outputNodes);
     }
+    job.setOutputFormatClass(CrunchOutputFormat.class);
     serialize(inputNodes, conf, workingPath, NodeContext.MAP);
 
     if (inputNodes.size() == 1) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/fcf901cb/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java
new file mode 100644
index 0000000..bd9cdc9
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import org.apache.crunch.io.CrunchOutputs;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class CrunchOutputFormat<K, V> extends OutputFormat<K, V> {
+  @Override
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext taskAttemptContext)
+      throws IOException, InterruptedException {
+    return new RecordWriter<K, V>() {
+      @Override
+      public void write(K k, V v) throws IOException, InterruptedException {
+      }
+
+      @Override
+      public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
{
+      }
+    };
+  }
+
+  @Override
+  public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException
{
+    CrunchOutputs.checkOutputSpecs(jobContext);
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext)
+      throws IOException, InterruptedException {
+    return CrunchOutputs.getOutputCommitter(taskAttemptContext);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/fcf901cb/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
index e811bcf..57fe139 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
@@ -17,14 +17,24 @@
  */
 package org.apache.crunch.io;
 
+import com.google.common.collect.Sets;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.base.Joiner;
@@ -35,6 +45,8 @@ import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 
 /**
  * An analogue of {@link CrunchInputs} for handling multiple {@code OutputFormat} instances
@@ -64,6 +76,31 @@ public class CrunchOutputs<K, V> {
     conf.set(CRUNCH_OUTPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs);
   }
 
+  public static void checkOutputSpecs(JobContext jc) throws IOException, InterruptedException
{
+    Map<String, OutputConfig> outputs = getNamedOutputs(jc.getConfiguration());
+    for (Map.Entry<String, OutputConfig> e : outputs.entrySet()) {
+      String namedOutput = e.getKey();
+      Job job = getJob(jc.getJobID(), e.getKey(), jc.getConfiguration());
+      OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue());
+      fmt.checkOutputSpecs(jc);
+    }
+  }
+
+  public static OutputCommitter getOutputCommitter(TaskAttemptContext tac) throws IOException,
InterruptedException {
+    Map<String, OutputConfig> outputs = getNamedOutputs(tac.getConfiguration());
+    Map<String, OutputCommitter> committers = Maps.newHashMap();
+    for (Map.Entry<String, OutputConfig> e : outputs.entrySet()) {
+      String namedOutput = e.getKey();
+      Job job = getJob(tac.getJobID(), e.getKey(), tac.getConfiguration());
+      OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue());
+      TaskAttemptContext taskContext = TaskAttemptContextFactory.create(
+          job.getConfiguration(), tac.getTaskAttemptID());
+      OutputCommitter oc = fmt.getOutputCommitter(taskContext);
+      committers.put(namedOutput, oc);
+    }
+    return new CompositeOutputCommitter(outputs, committers);
+  }
+
   public static class OutputConfig<K, V> {
     public FormatBundle<OutputFormat<K, V>> bundle;
     public Class<K> keyClass;
@@ -84,6 +121,10 @@ public class CrunchOutputs<K, V> {
 
   public static Map<String, OutputConfig> getNamedOutputs(Configuration conf) {
     Map<String, OutputConfig> out = Maps.newHashMap();
+    String serOut = conf.get(CRUNCH_OUTPUTS);
+    if (serOut == null || serOut.isEmpty()) {
+      return out;
+    }
     for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_OUTPUTS))) {
       List<String> fields = Lists.newArrayList(SPLITTER.split(input));
       String name = fields.get(0);
@@ -101,10 +142,10 @@ public class CrunchOutputs<K, V> {
   private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
   private static final String COUNTERS_GROUP = CrunchOutputs.class.getName();
 
-  private final TaskInputOutputContext<?, ?, K, V> baseContext;
+  private TaskInputOutputContext<?, ?, K, V> baseContext;
+  private Configuration baseConf;
   private final Map<String, OutputConfig> namedOutputs;
-  private final Map<String, RecordWriter<K, V>> recordWriters;
-  private final Map<String, TaskAttemptContext> taskContextCache;
+  private final Map<String, OutputState<K, V>> outputStates;
   private final boolean disableOutputCounters;
 
   /**
@@ -114,11 +155,15 @@ public class CrunchOutputs<K, V> {
    * @param context the TaskInputOutputContext object
    */
   public CrunchOutputs(TaskInputOutputContext<?, ?, K, V> context) {
+    this(context.getConfiguration());
     this.baseContext = context;
-    namedOutputs = getNamedOutputs(context);
-    recordWriters = Maps.newHashMap();
-    taskContextCache = Maps.newHashMap();
-    this.disableOutputCounters = context.getConfiguration().getBoolean(CRUNCH_DISABLE_OUTPUT_COUNTERS,
false);
+  }
+
+  public CrunchOutputs(Configuration conf) {
+    this.baseConf = conf;
+    this.namedOutputs = getNamedOutputs(conf);
+    this.outputStates = Maps.newHashMap();
+    this.disableOutputCounters = conf.getBoolean(CRUNCH_DISABLE_OUTPUT_COUNTERS, false);
   }
 
   @SuppressWarnings("unchecked")
@@ -128,63 +173,226 @@ public class CrunchOutputs<K, V> {
       throw new IllegalArgumentException("Undefined named output '" +
         namedOutput + "'");
     }
-    TaskAttemptContext taskContext = getContext(namedOutput);
     if (!disableOutputCounters) {
       baseContext.getCounter(COUNTERS_GROUP, namedOutput).increment(1);
     }
-    getRecordWriter(taskContext, namedOutput).write(key, value);
+    getOutputState(namedOutput).write(key, value);
   }
   
   public void close() throws IOException, InterruptedException {
-    for (RecordWriter<?, ?> writer : recordWriters.values()) {
-      writer.close(baseContext);
+    for (OutputState<?, ?> out : outputStates.values()) {
+      out.close();
     }
   }
-  
-  private TaskAttemptContext getContext(String nameOutput) throws IOException {
-    TaskAttemptContext taskContext = taskContextCache.get(nameOutput);
-    if (taskContext != null) {
-      return taskContext;
+
+  private OutputState<K, V> getOutputState(String namedOutput) throws IOException,
InterruptedException {
+    OutputState<?, ?> out = outputStates.get(namedOutput);
+    if (out != null) {
+      return (OutputState<K, V>) out;
     }
 
     // The following trick leverages the instantiation of a record writer via
     // the job thus supporting arbitrary output formats.
-    OutputConfig outConfig = namedOutputs.get(nameOutput);
-    Configuration conf = new Configuration(baseContext.getConfiguration());
-    Job job = new Job(conf);
-    job.getConfiguration().set("crunch.namedoutput", nameOutput);
+    Job job = getJob(baseContext.getJobID(), namedOutput, baseConf);
+
+    // Get a job with the expected named output.
+    job = getJob(job.getJobID(), namedOutput,baseConf);
+
+    OutputFormat<K, V> fmt = getOutputFormat(namedOutput, job, namedOutputs.get(namedOutput));
+    TaskAttemptContext taskContext = null;
+    RecordWriter<K, V> recordWriter = null;
+
+    if (baseContext != null) {
+      taskContext = getTaskContext(baseContext, job);
+
+      recordWriter = fmt.getRecordWriter(taskContext);
+    }
+    OutputState<K, V> outputState = new OutputState(taskContext, recordWriter);
+    this.outputStates.put(namedOutput, outputState);
+    return outputState;
+  }
+
+  private static Job getJob(JobID jobID, String namedOutput, Configuration baseConf)
+      throws IOException {
+    Job job = new Job(new Configuration(baseConf));
+    job.getConfiguration().set("crunch.namedoutput", namedOutput);
+    setJobID(job, jobID, namedOutput);
+    return job;
+  }
+
+  private static TaskAttemptContext getTaskContext(TaskAttemptContext baseContext, Job job)
{
+
+    org.apache.hadoop.mapreduce.TaskAttemptID baseTaskId = baseContext.getTaskAttemptID();
+
+    // Create a task ID context with our specialized job ID.
+    org.apache.hadoop.mapreduce.TaskAttemptID  taskId;
+    taskId = new org.apache.hadoop.mapreduce.TaskAttemptID(job.getJobID().getJtIdentifier(),
+            job.getJobID().getId(),
+            baseTaskId.isMap(),
+            baseTaskId.getTaskID().getId(),
+            baseTaskId.getId());
+
+    return TaskAttemptContextFactory.create(
+            job.getConfiguration(), taskId);
+  }
+
+  private static void setJobID(Job job, JobID jobID, String namedOutput) {
+    Method setJobIDMethod;
+    JobID newJobID = jobID;
+    try {
+      // Hadoop 2
+      setJobIDMethod = Job.class.getMethod("setJobID", JobID.class);
+      // Add the named output to the job ID, since that is used by some output formats
+      // to create temporary outputs.
+      newJobID = jobID == null || jobID.getJtIdentifier().contains(namedOutput) ?
+          jobID :
+          new JobID(jobID.getJtIdentifier() + "_" + namedOutput, jobID.getId());
+    } catch (NoSuchMethodException e) {
+      // Hadoop 1's setJobID method is package private and declared by JobContext
+      try {
+        setJobIDMethod = JobContext.class.getDeclaredMethod("setJobID", JobID.class);
+      } catch (NoSuchMethodException e1) {
+        throw new CrunchRuntimeException(e);
+      }
+      setJobIDMethod.setAccessible(true);
+    }
+    try {
+      setJobIDMethod.invoke(job, newJobID);
+    } catch (Exception e) {
+      throw new CrunchRuntimeException("Could not set job ID to " + jobID, e);
+    }
+  }
+
+  private static void configureJob(
+      String namedOutput,
+      Job job,
+      OutputConfig outConfig) throws IOException {
+    job.getConfiguration().set(BASE_OUTPUT_NAME, namedOutput);
     job.setOutputFormatClass(outConfig.bundle.getFormatClass());
     job.setOutputKeyClass(outConfig.keyClass);
     job.setOutputValueClass(outConfig.valueClass);
     outConfig.bundle.configure(job.getConfiguration());
-    taskContext = TaskAttemptContextFactory.create(
-      job.getConfiguration(), baseContext.getTaskAttemptID());
+  }
 
-    taskContextCache.put(nameOutput, taskContext);
-    return taskContext;
+  private static OutputFormat getOutputFormat(
+      String namedOutput,
+      Job job,
+      OutputConfig outConfig) throws IOException {
+    configureJob(namedOutput, job, outConfig);
+    try {
+      return ReflectionUtils.newInstance(
+          job.getOutputFormatClass(),
+          job.getConfiguration());
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
   }
 
-  private synchronized RecordWriter<K, V> getRecordWriter(
-      TaskAttemptContext taskContext, String namedOutput)
-      throws IOException, InterruptedException {
-    // look for record-writer in the cache
-    RecordWriter<K, V> writer = recordWriters.get(namedOutput);
+  private static class OutputState<K, V> {
+    private final TaskAttemptContext context;
+    private final RecordWriter<K, V> recordWriter;
 
-    // If not in cache, create a new one
-    if (writer == null) {
-      // get the record writer from context output format
-      taskContext.getConfiguration().set(BASE_OUTPUT_NAME, namedOutput);
-      try {
-        OutputFormat format = ReflectionUtils.newInstance(
-            taskContext.getOutputFormatClass(),
-            taskContext.getConfiguration());
-        writer = format.getRecordWriter(taskContext);
-      } catch (ClassNotFoundException e) {
-        throw new IOException(e);
+    public OutputState(TaskAttemptContext context, RecordWriter<K, V> recordWriter)
{
+      this.context = context;
+      this.recordWriter = recordWriter;
+    }
+
+    public void write(K key, V value) throws IOException, InterruptedException {
+      recordWriter.write(key, value);
+    }
+
+    public void close() throws IOException, InterruptedException {
+      recordWriter.close(context);
+    }
+  }
+
+  private static class CompositeOutputCommitter extends OutputCommitter {
+
+    private final Map<String, OutputConfig> outputs;
+    private final Map<String, OutputCommitter> committers;
+
+    public CompositeOutputCommitter(Map<String, OutputConfig> outputs, Map<String,
OutputCommitter> committers) {
+      this.outputs = outputs;
+      this.committers = committers;
+    }
+
+    private TaskAttemptContext getContext(String namedOutput, TaskAttemptContext baseContext)
throws IOException {
+      Job job = getJob(baseContext.getJobID(), namedOutput, baseContext.getConfiguration());
+      configureJob(namedOutput, job, outputs.get(namedOutput));
+
+      return getTaskContext(baseContext, job);
+    }
+
+    @Override
+    public void setupJob(JobContext jobContext) throws IOException {
+      Configuration conf = jobContext.getConfiguration();
+      for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+        Job job = getJob(jobContext.getJobID(), e.getKey(), conf);
+        configureJob(e.getKey(), job, outputs.get(e.getKey()));
+        e.getValue().setupJob(job);
+      }
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
+      for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+        e.getValue().setupTask(getContext(e.getKey(), taskAttemptContext));
+      }
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException
{
+      for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+        if (e.getValue().needsTaskCommit(getContext(e.getKey(), taskAttemptContext))) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
+      for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+
+        e.getValue().commitTask(getContext(e.getKey(), taskAttemptContext));
       }
-      recordWriters.put(namedOutput, writer);
     }
 
-    return writer;
+    @Override
+    public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
+      for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+        e.getValue().abortTask(getContext(e.getKey(), taskAttemptContext));
+      }
+    }
+
+    @Override
+    public void commitJob(JobContext jobContext) throws IOException {
+      Configuration conf = jobContext.getConfiguration();
+      Set<Path> handledPaths = Sets.newHashSet();
+      for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+        OutputCommitter oc = e.getValue();
+        if (oc instanceof FileOutputCommitter) {
+          Path workPath = ((FileOutputCommitter) oc).getWorkPath();
+          if (handledPaths.contains(workPath)) {
+            continue;
+          } else {
+            handledPaths.add(workPath);
+          }
+        }
+        Job job = getJob(jobContext.getJobID(), e.getKey(), conf);
+        configureJob(e.getKey(), job, outputs.get(e.getKey()));
+        oc.commitJob(job);
+      }
+    }
+
+    @Override
+    public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException
{
+      Configuration conf = jobContext.getConfiguration();
+      for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+        Job job = getJob(jobContext.getJobID(), e.getKey(), conf);
+        configureJob(e.getKey(), job, outputs.get(e.getKey()));
+        e.getValue().abortJob(job, state);
+      }
+    }
   }
 }


Mime
View raw message