tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject svn commit: r1469642 [27/36] - in /incubator/tez/branches/TEZ-1: ./ example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/ example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/ tez-common/src/main/ tez-common/src/main/java/ t...
Date Thu, 18 Apr 2013 23:54:28 GMT
Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.task.local.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.engine.records.TezTaskID;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from child space and see mapreduce.cluster.local.dir as
+ * taskTracker/jobCache/jobId/attemptId
+ * This class should not be used from TaskTracker space.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TezTaskOutputFiles extends TezTaskOutput {
+
+  private Configuration conf;
+
+  private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
+  private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN
+      + ".index";
+
+  public TezTaskOutputFiles() {
+  }
+
+  // assume configured to $localdir/usercache/$user/appcache/$appId
+  private LocalDirAllocator lDirAlloc = 
+    new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
+
+  private Path getAttemptOutputDir() {
+    System.err.println("getAttemptOutputDir: " + 
+        Constants.TASK_OUTPUT_DIR + "/" + conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+    return new Path(Constants.TASK_OUTPUT_DIR, conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+  }
+  
+  /**
+   * Return the path to local map output file created earlier
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFile() throws IOException {
+    Path attemptOutput =
+      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
+    return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
+  }
+
+  /**
+   * Create a local map output file name.
+   * 
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFileForWrite(long size) throws IOException {
+    Path attemptOutput = 
+      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
+    return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf);
+  }
+
+  /**
+   * Create a local map output file name on the same volume.
+   */
+  public Path getOutputFileForWriteInVolume(Path existing) {
+    Path outputDir = new Path(existing.getParent(), Constants.TASK_OUTPUT_DIR);
+    Path attemptOutputDir = new Path(outputDir,
+        conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+    return new Path(attemptOutputDir, Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
+  }
+
+  /**
+   * Return the path to a local map output index file created earlier
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputIndexFile() throws IOException {
+    Path attemptIndexOutput =
+      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
+                                      Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+    return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
+  }
+
+  /**
+   * Create a local map output index file name.
+   * 
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputIndexFileForWrite(long size) throws IOException {
+    Path attemptIndexOutput =
+      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
+                                      Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+    return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(),
+        size, conf);
+  }
+
+  /**
+   * Create a local map output index file name on the same volume.
+   */
+  public Path getOutputIndexFileForWriteInVolume(Path existing) {
+    Path outputDir = new Path(existing.getParent(), Constants.TASK_OUTPUT_DIR);
+    Path attemptOutputDir = new Path(outputDir,
+        conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+    return new Path(attemptOutputDir, Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
+                                      Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+  }
+
+  /**
+   * Return a local map spill file created earlier.
+   * 
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillFile(int spillNumber) throws IOException {
+    return lDirAlloc.getLocalPathToRead(
+        String.format(SPILL_FILE_PATTERN,
+            conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID), spillNumber), conf);
+  }
+
+  /**
+   * Create a local map spill file name.
+   * 
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(
+        String.format(String.format(SPILL_FILE_PATTERN,
+            conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID), spillNumber)), size, conf);
+  }
+
+  /**
+   * Return a local map spill index file created earlier
+   * 
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillIndexFile(int spillNumber) throws IOException {
+    return lDirAlloc.getLocalPathToRead(
+        String.format(SPILL_INDEX_FILE_PATTERN,
+            conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID), spillNumber), conf);
+  }
+
+  /**
+   * Create a local map spill index file name.
+   * 
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillIndexFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(
+        String.format(SPILL_INDEX_FILE_PATTERN,
+            conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID), spillNumber), size, conf);
+  }
+
+  /**
+   * Return a local reduce input file created earlier
+   * 
+   * @param mapId a map task id
+   * @return path
+   * @throws IOException 
+   */
+  public Path getInputFile(int mapId) throws IOException {
+    throw new UnsupportedOperationException("Incompatible with LocalRunner");
+  }
+
+  /**
+   * Create a local reduce input file name.
+   * 
+   * @param mapId a map task id
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getInputFileForWrite(TezTaskID mapId,
+      long size) throws IOException {
+    return lDirAlloc.getLocalPathForWrite(String.format(
+        Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING,
+        getAttemptOutputDir().toString(), mapId.getId()),
+        size, conf);
+  }
+
+  /** Removes all of the files related to a task. */
+  public void removeAll() throws IOException {
+    throw new UnsupportedOperationException("Incompatible with LocalRunner");
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskReporter;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.common.combine.CombineInput;
+import org.apache.tez.engine.common.localshuffle.LocalShuffle;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+/**
+ * {@link LocalMergedInput} in an {@link Input} which shuffles intermediate
+ * sorted data, merges them and provides key/<values> to the consumer. 
+ */
+public class LocalMergedInput extends ShuffledMergedInput {
+
+  TezRawKeyValueIterator rIter = null;
+
+  private final TezTask task;
+  
+  private Configuration conf;
+  private CombineInput raw;
+
+  @Inject
+  public LocalMergedInput(
+      @Assisted TezTask task
+      ) {
+    super(task);
+    this.task = task;
+  }
+
+  public void initialize(Configuration conf, Master master) throws IOException,
+      InterruptedException {
+    this.conf = conf;
+
+    LocalShuffle shuffle =
+        new LocalShuffle(task, this.conf, (TezTaskReporter)master);
+    rIter = shuffle.run();
+    raw = new CombineInput(rIter);
+  }
+
+  public boolean hasNext() throws IOException, InterruptedException {
+    return raw.hasNext();
+  }
+
+  public Object getNextKey() throws IOException, InterruptedException {
+    return raw.getNextKey();
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public Iterable getNextValues() 
+      throws IOException, InterruptedException {
+    return raw.getNextValues();
+  }
+
+  public float getProgress() throws IOException, InterruptedException {
+    return raw.getProgress();
+  }
+
+  public void close() throws IOException {
+    raw.close();
+  }
+
+  public TezRawKeyValueIterator getIterator() {
+    return rIter;
+  }
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.lib.input;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskReporter;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.common.combine.CombineInput;
+import org.apache.tez.engine.common.shuffle.impl.Shuffle;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+/**
+ * {@link ShuffledMergedInput} in an {@link Input} which shuffles intermediate
+ * sorted data, merges them and provides key/<values> to the consumer. 
+ */
+public class ShuffledMergedInput implements Input {
+
+  static final Log LOG = LogFactory.getLog(ShuffledMergedInput.class);
+  TezRawKeyValueIterator rIter = null;
+
+  private TezTask task;
+  
+  private Configuration conf;
+  private CombineInput raw;
+
+  @Inject
+  public ShuffledMergedInput(
+      @Assisted TezTask task
+      ) {
+  }
+
+  public void setTask(TezTask task) {
+    this.task = task;
+  }
+  
+  public void initialize(Configuration conf, Master master) throws IOException,
+      InterruptedException {
+    this.conf = conf;
+    
+    Shuffle shuffle = 
+      new Shuffle(
+          task, this.conf,      
+          this.conf.getInt(
+              TezJobConfig.TEZ_ENGINE_TASK_INDEGREE, 
+              TezJobConfig.DEFAULT_TEZ_ENGINE_TASK_INDEGREE),
+          (TezTaskReporter)master, 
+          task.getCombineProcessor());
+    rIter = shuffle.run();
+    
+    raw = new CombineInput(rIter);
+  }
+
+  public boolean hasNext() throws IOException, InterruptedException {
+    return raw.hasNext();
+  }
+
+  public Object getNextKey() throws IOException, InterruptedException {
+    return raw.getNextKey();
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public Iterable getNextValues() 
+      throws IOException, InterruptedException {
+    return raw.getNextValues();
+  }
+
+  public float getProgress() throws IOException, InterruptedException {
+    return raw.getProgress();
+  }
+
+  public void close() throws IOException {
+    raw.close();
+  }
+
+  public TezRawKeyValueIterator getIterator() {
+    return rIter;
+  }
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.lib.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.common.sort.SortingOutput;
+import org.apache.tez.engine.common.sort.impl.dflt.InMemoryShuffleSorter;
+import org.apache.tez.engine.records.OutputContext;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+/**
+ * {@link InMemorySortedOutput} is an {@link Output} which sorts key/value pairs 
+ * written to it and persists it to a file.
+ */
+public class InMemorySortedOutput implements SortingOutput {
+  
+  protected InMemoryShuffleSorter sorter;
+  
+  @Inject
+  public InMemorySortedOutput(
+      @Assisted TezTask task
+      ) throws IOException {
+    sorter = new InMemoryShuffleSorter(task);
+  }
+  
+  public void initialize(Configuration conf, Master master) 
+      throws IOException, InterruptedException {
+    sorter.initialize(conf, master);
+  }
+
+  public void setTask(TezTask task) {
+    sorter.setTask(task);
+  }
+  
+  public void write(Object key, Object value) throws IOException,
+      InterruptedException {
+    sorter.write(key, value);
+  }
+
+  public void close() throws IOException, InterruptedException {
+    sorter.flush();
+    sorter.close();
+  }
+
+  @Override
+  public OutputContext getOutputContext() {
+    return sorter.getOutputContext();
+  }
+
+  public InMemoryShuffleSorter getSorter()  {
+    return sorter;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,64 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.lib.output;
+
+import java.io.IOException;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+public class LocalOnFileSorterOutput extends OnFileSortedOutput {
+
+  private static final Log LOG = LogFactory.getLog(LocalOnFileSorterOutput.class);
+  
+  @Inject
+  public LocalOnFileSorterOutput(
+      @Assisted TezTask task
+      ) throws IOException {
+    super(task);
+  }
+
+  @Override
+  public void close() throws IOException, InterruptedException {
+    LOG.info("XXX close");
+
+    super.close();
+
+
+    TezTaskOutput mapOutputFile = sorter.getMapOutput();
+    FileSystem localFs = FileSystem.getLocal(mapOutputFile.getConf());
+
+    Path src = mapOutputFile.getOutputFile();
+    Path dst = 
+        mapOutputFile.getInputFileForWrite(
+            sorter.getTaskAttemptId().getTaskID(),
+            localFs.getFileStatus(src).getLen());
+
+    localFs.rename(src, dst);
+    LOG.info("XXX renaming src = " + src + ", dst = " + dst);
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.lib.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.common.sort.SortingOutput;
+import org.apache.tez.engine.common.sort.impl.ExternalSorter;
+import org.apache.tez.engine.common.sort.impl.dflt.DefaultSorter;
+import org.apache.tez.engine.records.OutputContext;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+/**
+ * {@link OnFileSortedOutput} is an {@link Output} which sorts key/value pairs 
+ * written to it and persists it to a file.
+ */
+public class OnFileSortedOutput implements SortingOutput {
+  
+  protected ExternalSorter sorter;
+  
+  @Inject
+  public OnFileSortedOutput(
+      @Assisted TezTask task
+      ) throws IOException {
+    sorter = new DefaultSorter(task);
+  }
+  
+  public void initialize(Configuration conf, Master master) 
+      throws IOException, InterruptedException {
+    sorter.initialize(conf, master);
+  }
+
+  public void setTask(TezTask task) {
+    sorter.setTask(task);
+  }
+  
+  public void write(Object key, Object value) throws IOException,
+      InterruptedException {
+    sorter.write(key, value);
+  }
+
+  public void close() throws IOException, InterruptedException {
+    sorter.flush();
+    sorter.close();
+  }
+
+  @Override
+  public OutputContext getOutputContext() {
+    return null;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/InputFactory.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/InputFactory.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/InputFactory.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/InputFactory.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,28 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.runtime;
+
+import org.apache.tez.common.TezTask;
+import org.apache.tez.engine.api.Input;
+
+public interface InputFactory {
+  
+  Input create(TezTask task);
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/InputFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/OutputFactory.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/OutputFactory.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/OutputFactory.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/OutputFactory.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,28 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.runtime;
+
+import org.apache.tez.common.TezTask;
+import org.apache.tez.engine.api.Output;
+
+public interface OutputFactory {
+  
+  Output create(TezTask task);
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/OutputFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/ProcessorFactory.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/ProcessorFactory.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/ProcessorFactory.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/ProcessorFactory.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,28 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.runtime;
+
+import org.apache.tez.common.TezTask;
+import org.apache.tez.engine.api.Processor;
+
+public interface ProcessorFactory {
+  
+  Processor create(TezTask task);
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/ProcessorFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TaskFactory.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TaskFactory.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TaskFactory.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TaskFactory.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,30 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.runtime;
+
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.api.Task;
+
+public interface TaskFactory {
+
+  Task create(Input in, Processor processor, Output out);
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TaskFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactory.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactory.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactory.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactory.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,28 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.runtime;
+
+import org.apache.tez.common.TezTask;
+import org.apache.tez.engine.api.Task;
+
+public interface TezEngineFactory {
+  
+  public Task createTask(TezTask taskContext);
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactoryImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactoryImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactoryImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,57 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.runtime;
+
+import org.apache.tez.common.TezTask;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.api.Task;
+
+import com.google.inject.Inject;
+
+public class TezEngineFactoryImpl 
+implements TezEngineFactory {
+
+  private final InputFactory inputFactory;
+  private final ProcessorFactory processorFactory;
+  private final OutputFactory outputFactory;
+  private final TaskFactory taskFactory;
+  
+  @Inject
+  public TezEngineFactoryImpl(
+      InputFactory inputFactory, 
+      ProcessorFactory processorFactory,
+      OutputFactory outputFactory,
+      TaskFactory taskFactory
+      ) {
+    this.inputFactory = inputFactory;
+    this.processorFactory = processorFactory;
+    this.outputFactory = outputFactory;
+    this.taskFactory = taskFactory;
+  }
+  
+  public Task createTask(TezTask taskContext) {
+    Input in = inputFactory.create(taskContext);
+    Output out = outputFactory.create(taskContext);
+    Processor processor = processorFactory.create(taskContext);
+    return taskFactory.create(in, processor, out);
+  }
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactoryImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.task;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.api.Task;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+public class RuntimeTask 
+implements Task {
+
+  private final Input in;
+  private final Output out;
+  private final Processor processor;
+  
+  private Configuration conf;
+  private Master master;
+  
+  @Inject
+  public RuntimeTask(
+      @Assisted Processor processor, 
+      @Assisted Input in, 
+      @Assisted Output out) {
+    this.in = in;
+    this.processor = processor;
+    this.out = out;
+  }
+
+  public void initialize(Configuration conf, Master master) throws IOException,
+      InterruptedException {
+    this.conf = conf;
+    this.master = master;
+
+    // NOTE: Allow processor to initialize input/output
+    processor.initialize(this.conf, this.master);
+  }
+
+  @Override
+  public Input getInput() {
+    return in;
+  }
+
+  @Override
+  public Processor getProcessor() {
+    return processor;
+  }
+
+  @Override
+  public Output getOutput() {
+    return out;
+  }
+
+  public void run() throws IOException, InterruptedException {
+    processor.process(in, out);
+  }
+
+  public void close() throws IOException, InterruptedException {
+    // NOTE: Allow processor to close input/output
+    processor.close();
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/dev-support/findbugs-exclude.xml?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/dev-support/findbugs-exclude.xml (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/dev-support/findbugs-exclude.xml Thu Apr 18 23:54:18 2013
@@ -0,0 +1,69 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<FindBugsFilter>
+
+ <!-- Ignore some irrelevant serialization warnings -->
+  <Match>
+    <Class name="org.apache.hadoop.examples.SecondarySort$FirstGroupingComparator" />
+    <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
+  </Match>
+
+  <Match>
+    <Class name="org.apache.hadoop.examples.SecondarySort$IntPair$Comparator" />
+    <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
+  </Match>
+
+
+ <!-- Ignore the irrelevant resource cleanup warnings-->
+  <Match>
+    <Class name="org.apache.hadoop.examples.DBCountPageView" />
+    <Method name="verify" />
+    <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
+  </Match>
+
+ <!-- Ignore the irrelevant closure warnings-->
+  <Match>
+    <Class name="org.apache.hadoop.examples.dancing.Pentomino$Piece" />
+    <Bug pattern="EI_EXPOSE_REP2" />
+  </Match>
+
+  <!-- Ignore the irrelevant package protection warnings-->
+  <Match>
+    <Class name="org.apache.hadoop.examples.dancing.Pentomino" />
+     <Or>
+      <Field name="fourRotations" />
+      <Field name="oneRotation" />
+      <Field name="twoRotations" />
+     </Or>
+    <Bug pattern="MS_PKGPROTECT" />
+  </Match>
+
+   <!-- Ignore the irrelevant right shift warnings, as only positive integers are given as input-->
+  <Match>
+    <Class name="org.apache.hadoop.examples.terasort.Unsigned16" />
+    <Method name="getHexDigit" />
+    <Bug pattern="ICAST_QUESTIONABLE_UNSIGNED_RIGHT_SHIFT" />
+  </Match>
+
+  <Match>
+    <Class name="org.apache.hadoop.examples.terasort.TeraInputFormat" />
+      <Method name="getSplits" />
+    <Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD" />
+  </Match>
+
+</FindBugsFilter>

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/dev-support/findbugs-exclude.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/pom.xml
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/pom.xml?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/pom.xml (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/pom.xml Thu Apr 18 23:54:18 2013
@@ -0,0 +1,128 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                      http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez</artifactId>
+    <version>0.2.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.tez</groupId>
+  <artifactId>tez-mapreduce-examples</artifactId>
+  <version>0.2.0-SNAPSHOT</version>
+  <packaging>jar</packaging>
+
+  <dependencies>
+    <dependency>
+      <groupId>commons-cli</groupId>
+      <artifactId>commons-cli</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <!--dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+       <groupId>org.apache.hadoop</groupId>
+       <artifactId>hadoop-hdfs</artifactId>
+       <scope>test</scope>
+       <type>test-jar</type>
+     </dependency-->
+     <dependency>
+       <groupId>org.apache.hadoop</groupId>
+       <artifactId>hadoop-yarn-server-tests</artifactId>
+       <scope>test</scope>
+       <type>test-jar</type>
+     </dependency>
+     <dependency>
+       <groupId>org.apache.tez</groupId>
+       <artifactId>tez-dag</artifactId>
+       <scope>provided</scope>
+     </dependency>
+     <!--dependency>
+       <groupId>org.apache.hadoop</groupId>
+       <artifactId>hadoop-mapreduce-client-app</artifactId>
+       <type>test-jar</type>
+       <scope>test</scope>
+     </dependency-->
+     <!--dependency>
+       <groupId>org.apache.hadoop</groupId>
+       <artifactId>hadoop-mapreduce-client-hs</artifactId>
+       <scope>test</scope>
+     </dependency-->
+     <!--dependency>
+       <groupId>org.hsqldb</groupId>
+       <artifactId>hsqldb</artifactId>
+       <scope>provided</scope>
+     </dependency-->
+     <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <scope>provided</scope>
+     </dependency>
+  </dependencies>
+  
+  <build>
+   <plugins>
+    <plugin>
+    <groupId>org.apache.maven.plugins</groupId>
+     <artifactId>maven-jar-plugin</artifactId>
+      <configuration>
+       <archive>
+         <manifest>
+           <mainClass>org.apache.tez.mapreduce.examples.ExampleDriver</mainClass>
+         </manifest>
+       </archive>
+     </configuration>
+    </plugin>
+    <plugin>
+      <groupId>org.apache.rat</groupId>
+      <artifactId>apache-rat-plugin</artifactId>
+      <configuration>
+      </configuration>
+    </plugin>
+   </plugins>
+  </build>
+</project>

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/pom.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples;
+
+import org.apache.hadoop.util.ProgramDriver;
+import org.apache.tez.mapreduce.examples.terasort.TeraGen;
+import org.apache.tez.mapreduce.examples.terasort.TeraSort;
+import org.apache.tez.mapreduce.examples.terasort.TeraValidate;
+
+/**
+ * A description of an example program based on its class and a 
+ * human-readable description.
+ */
+public class ExampleDriver {
+  
+  public static void main(String argv[]){
+    int exitCode = -1;
+    ProgramDriver pgd = new ProgramDriver();
+    try {
+      pgd.addClass("wordcount", WordCount.class, 
+                   "A map/reduce program that counts the words in the input files.");
+      pgd.addClass("wordcountmrrtest", WordCountMRRTest.class, 
+          "A map/reduce program that counts the words in the input files. Map splits on spaces. First reduce splits on \".\"");
+      pgd.addClass("randomwriter", RandomWriter.class, 
+                   "A map/reduce program that writes 10GB of random data per node.");
+      pgd.addClass("randomtextwriter", RandomTextWriter.class, 
+      "A map/reduce program that writes 10GB of random textual data per node.");
+      pgd.addClass("sort", Sort.class, "A map/reduce program that sorts the data written by the random writer.");
+      pgd.addClass("secondarysort", SecondarySort.class,
+                   "An example defining a secondary sort to the reduce.");
+      pgd.addClass("join", Join.class, "A job that effects a join over sorted, equally partitioned datasets");
+      pgd.addClass("teragen", TeraGen.class, "Generate data for the terasort");
+      pgd.addClass("terasort", TeraSort.class, "Run the terasort");
+      pgd.addClass("teravalidate", TeraValidate.class, "Checking results of terasort");
+      exitCode = pgd.driver(argv);
+    }
+    catch(Throwable e){
+      e.printStackTrace();
+    }
+    
+    System.exit(exitCode);
+  }
+}
+	

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/Join.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/Join.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/Join.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/Join.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.join.CompositeInputFormat;
+import org.apache.hadoop.mapreduce.lib.join.TupleWritable;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Given a set of sorted datasets keyed with the same class and yielding
+ * equal partitions, it is possible to effect a join of those datasets 
+ * prior to the map. The example facilitates the same.
+ *
+ * To run: bin/hadoop jar build/hadoop-examples.jar join
+ *            [-r <i>reduces</i>]
+ *            [-inFormat <i>input format class</i>] 
+ *            [-outFormat <i>output format class</i>] 
+ *            [-outKey <i>output key class</i>] 
+ *            [-outValue <i>output value class</i>] 
+ *            [-joinOp &lt;inner|outer|override&gt;]
+ *            [<i>in-dir</i>]* <i>in-dir</i> <i>out-dir</i> 
+ */
+public class Join extends Configured implements Tool {
+  public static final String REDUCES_PER_HOST = "mapreduce.join.reduces_per_host";
+  static int printUsage() {
+    System.out.println("join [-r <reduces>] " +
+                       "[-inFormat <input format class>] " +
+                       "[-outFormat <output format class>] " + 
+                       "[-outKey <output key class>] " +
+                       "[-outValue <output value class>] " +
+                       "[-joinOp <inner|outer|override>] " +
+                       "[input]* <input> <output>");
+    ToolRunner.printGenericCommandUsage(System.out);
+    return 2;
+  }
+
+  /**
+   * The main driver for sort program.
+   * Invoke this method to submit the map/reduce job.
+   * @throws IOException When there is communication problems with the 
+   *                     job tracker.
+   */
+  @SuppressWarnings("unchecked")
+  public int run(String[] args) throws Exception {
+    Configuration conf = getConf();
+    JobClient client = new JobClient(conf);
+    ClusterStatus cluster = client.getClusterStatus();
+    int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
+    String join_reduces = conf.get(REDUCES_PER_HOST);
+    if (join_reduces != null) {
+       num_reduces = cluster.getTaskTrackers() * 
+                       Integer.parseInt(join_reduces);
+    }
+    Job job = new Job(conf);
+    job.setJobName("join");
+    job.setJarByClass(Sort.class);
+
+    job.setMapperClass(Mapper.class);        
+    job.setReducerClass(Reducer.class);
+
+    Class<? extends InputFormat> inputFormatClass = 
+      SequenceFileInputFormat.class;
+    Class<? extends OutputFormat> outputFormatClass = 
+      SequenceFileOutputFormat.class;
+    Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;
+    Class<? extends Writable> outputValueClass = TupleWritable.class;
+    String op = "inner";
+    List<String> otherArgs = new ArrayList<String>();
+    for(int i=0; i < args.length; ++i) {
+      try {
+        if ("-r".equals(args[i])) {
+          num_reduces = Integer.parseInt(args[++i]);
+        } else if ("-inFormat".equals(args[i])) {
+          inputFormatClass = 
+            Class.forName(args[++i]).asSubclass(InputFormat.class);
+        } else if ("-outFormat".equals(args[i])) {
+          outputFormatClass = 
+            Class.forName(args[++i]).asSubclass(OutputFormat.class);
+        } else if ("-outKey".equals(args[i])) {
+          outputKeyClass = 
+            Class.forName(args[++i]).asSubclass(WritableComparable.class);
+        } else if ("-outValue".equals(args[i])) {
+          outputValueClass = 
+            Class.forName(args[++i]).asSubclass(Writable.class);
+        } else if ("-joinOp".equals(args[i])) {
+          op = args[++i];
+        } else {
+          otherArgs.add(args[i]);
+        }
+      } catch (NumberFormatException except) {
+        System.out.println("ERROR: Integer expected instead of " + args[i]);
+        return printUsage();
+      } catch (ArrayIndexOutOfBoundsException except) {
+        System.out.println("ERROR: Required parameter missing from " +
+            args[i-1]);
+        return printUsage(); // exits
+      }
+    }
+
+    // Set user-supplied (possibly default) job configs
+    job.setNumReduceTasks(num_reduces);
+
+    if (otherArgs.size() < 2) {
+      System.out.println("ERROR: Wrong number of parameters: ");
+      return printUsage();
+    }
+
+    FileOutputFormat.setOutputPath(job, 
+      new Path(otherArgs.remove(otherArgs.size() - 1)));
+    List<Path> plist = new ArrayList<Path>(otherArgs.size());
+    for (String s : otherArgs) {
+      plist.add(new Path(s));
+    }
+
+    job.setInputFormatClass(CompositeInputFormat.class);
+    job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR, 
+      CompositeInputFormat.compose(op, inputFormatClass,
+      plist.toArray(new Path[0])));
+    job.setOutputFormatClass(outputFormatClass);
+
+    job.setOutputKeyClass(outputKeyClass);
+    job.setOutputValueClass(outputValueClass);
+
+    Date startTime = new Date();
+    System.out.println("Job started: " + startTime);
+    int ret = job.waitForCompletion(true) ? 0 : 1 ;
+    Date end_time = new Date();
+    System.out.println("Job ended: " + end_time);
+    System.out.println("The job took " + 
+        (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
+    return ret;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new Join(), args);
+    System.exit(res);
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/Join.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message