tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1457129 [6/38] - in /incubator/tez: ./ tez-ampool/ tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/ tez-ampool/src/main/conf/ tez-ampool/src/main/java/ tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-amp...
Date Fri, 15 Mar 2013 21:26:48 GMT
Added: incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/LimitExceededException.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/LimitExceededException.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/LimitExceededException.java (added)
+++ incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/LimitExceededException.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class LimitExceededException extends RuntimeException {
+
+  private static final long serialVersionUID = 1L;
+
+  public LimitExceededException(String msg) {
+    super(msg);
+  }
+
+  // Only allows chaining of related exceptions
+  public LimitExceededException(LimitExceededException cause) {
+    super(cause);
+  }
+}

Added: incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/Limits.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/Limits.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/Limits.java (added)
+++ incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/Limits.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+
+@InterfaceAudience.Private
+public class Limits {
+
+  static final Configuration conf = new Configuration();
+  public static final int GROUP_NAME_MAX =
+      conf.getInt(TezJobConfig.COUNTER_GROUP_NAME_MAX_KEY, 
+          TezJobConfig.COUNTER_GROUP_NAME_MAX_DEFAULT);
+  public static final int COUNTER_NAME_MAX =
+      conf.getInt(TezJobConfig.COUNTER_NAME_MAX_KEY, 
+          TezJobConfig.COUNTER_NAME_MAX_DEFAULT);
+  public static final int GROUPS_MAX =
+      conf.getInt(TezJobConfig.COUNTER_GROUPS_MAX_KEY, 
+          TezJobConfig.COUNTER_GROUPS_MAX_DEFAULT);
+  public static final int COUNTERS_MAX =
+      conf.getInt(TezJobConfig.COUNTERS_MAX_KEY, TezJobConfig.
+          COUNTERS_MAX_DEFAULT);
+
+  private int totalCounters;
+  private LimitExceededException firstViolation;
+
+  public static String filterName(String name, int maxLen) {
+    return name.length() > maxLen ? name.substring(0, maxLen - 1) : name;
+  }
+
+  public static String filterCounterName(String name) {
+    return filterName(name, COUNTER_NAME_MAX);
+  }
+
+  public static String filterGroupName(String name) {
+    return filterName(name, GROUP_NAME_MAX);
+  }
+
+  public synchronized void checkCounters(int size) {
+    if (firstViolation != null) {
+      throw new LimitExceededException(firstViolation);
+    }
+    if (size > COUNTERS_MAX) {
+      firstViolation = new LimitExceededException("Too many counters: "+ size +
+                                                  " max="+ COUNTERS_MAX);
+      throw firstViolation;
+    }
+  }
+
+  public synchronized void incrCounters() {
+    checkCounters(totalCounters + 1);
+    ++totalCounters;
+  }
+
+  public synchronized void checkGroups(int size) {
+    if (firstViolation != null) {
+      throw new LimitExceededException(firstViolation);
+    }
+    if (size > GROUPS_MAX) {
+      firstViolation = new LimitExceededException("Too many counter groups: "+
+                                                  size +" max="+ GROUPS_MAX);
+    }
+  }
+
+  public synchronized LimitExceededException violation() {
+    return firstViolation;
+  }
+}

Added: incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/ResourceBundles.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/ResourceBundles.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/ResourceBundles.java (added)
+++ incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/ResourceBundles.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import java.util.Locale;
+import java.util.ResourceBundle;
+import java.util.MissingResourceException;
+
+/**
+ * Helper class to handle resource bundles in a saner way
+ */
+public class ResourceBundles {
+
+  /**
+   * Get a resource bundle
+   * @param bundleName of the resource
+   * @return the resource bundle
+   * @throws MissingResourceException
+   */
+  public static ResourceBundle getBundle(String bundleName) {
+    return ResourceBundle.getBundle(bundleName.replace('$', '_'),
+        Locale.getDefault(), Thread.currentThread().getContextClassLoader());
+  }
+
+  /**
+   * Get a resource given bundle name and key
+   * @param <T> type of the resource
+   * @param bundleName name of the resource bundle
+   * @param key to lookup the resource
+   * @param suffix for the key to lookup
+   * @param defaultValue of the resource
+   * @return the resource or the defaultValue
+   * @throws ClassCastException if the resource found doesn't match T
+   */
+  @SuppressWarnings("unchecked")
+  public static synchronized <T> T getValue(String bundleName, String key,
+                                            String suffix, T defaultValue) {
+    T value;
+    try {
+      ResourceBundle bundle = getBundle(bundleName);
+      value = (T) bundle.getObject(getLookupKey(key, suffix));
+    }
+    catch (Exception e) {
+      return defaultValue;
+    }
+    return value == null ? defaultValue : value;
+  }
+
+  private static String getLookupKey(String key, String suffix) {
+    if (suffix == null || suffix.isEmpty()) return key;
+    return key + suffix;
+  }
+
+  /**
+   * Get the counter group display name
+   * @param group the group name to lookup
+   * @param defaultValue of the group
+   * @return the group display name
+   */
+  public static String getCounterGroupName(String group, String defaultValue) {
+    return getValue(group, "CounterGroupName", "", defaultValue);
+  }
+
+  /**
+   * Get the counter display name
+   * @param group the counter group name for the counter
+   * @param counter the counter name to lookup
+   * @param defaultValue of the counter
+   * @return the counter display name
+   */
+  public static String getCounterName(String group, String counter,
+                                      String defaultValue) {
+    return getValue(group, counter, ".name", defaultValue);
+  }
+}

Added: incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/TaskCounter.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/TaskCounter.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/TaskCounter.java (added)
+++ incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/TaskCounter.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+// TODO TEZAM5 For MR compatibility, a conversion from tez.TaskCounters to
+// mapreduce.TaskCounters will likely be required somewhere.
+// Similarly for FileSystemCounters and others.
+
+// Counters used by Task classes
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public enum TaskCounter {
+  MAP_INPUT_RECORDS, 
+  MAP_OUTPUT_RECORDS,
+  MAP_SKIPPED_RECORDS,
+  MAP_OUTPUT_BYTES,
+  MAP_OUTPUT_MATERIALIZED_BYTES,
+  SPLIT_RAW_BYTES,
+  COMBINE_INPUT_RECORDS,
+  COMBINE_OUTPUT_RECORDS,
+  REDUCE_INPUT_GROUPS,
+  REDUCE_SHUFFLE_BYTES,
+  REDUCE_INPUT_RECORDS,
+  REDUCE_OUTPUT_RECORDS,
+  REDUCE_SKIPPED_GROUPS,
+  REDUCE_SKIPPED_RECORDS,
+  SPILLED_RECORDS,
+  SHUFFLED_MAPS, 
+  FAILED_SHUFFLE,
+  MERGED_MAP_OUTPUTS,
+  GC_TIME_MILLIS,
+  CPU_MILLISECONDS,
+  PHYSICAL_MEMORY_BYTES,
+  VIRTUAL_MEMORY_BYTES,
+  COMMITTED_HEAP_BYTES
+}

Added: incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/TezCounter.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/TezCounter.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/TezCounter.java (added)
+++ incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/TezCounter.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A named counter that tracks the progress of a map/reduce job.
+ *
+ * <p><code>Counters</code> represent global counters, defined either by the
+ * Map-Reduce framework or applications. Each <code>Counter</code> is named by
+ * an {@link Enum} and has a long for the value.</p>
+ *
+ * <p><code>Counters</code> are bunched into Groups, each comprising of
+ * counters from a particular <code>Enum</code> class.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public interface TezCounter extends Writable {
+
+  /**
+   * Set the display name of the counter
+   * @param displayName of the counter
+   * @deprecated (and no-op by default)
+   */
+  @Deprecated
+  void setDisplayName(String displayName);
+
+  /**
+   * @return the name of the counter
+   */
+  String getName();
+
+  /**
+   * Get the display name of the counter.
+   * @return the user facing name of the counter
+   */
+  String getDisplayName();
+
+  /**
+   * What is the current value of this counter?
+   * @return the current value
+   */
+  long getValue();
+
+  /**
+   * Set this counter by the given value
+   * @param value the value to set
+   */
+  void setValue(long value);
+
+  /**
+   * Increment this counter by the given value
+   * @param incr the value to increase this counter by
+   */
+  void increment(long incr);
+ 
+  /**
+   * Return the underlying object if this is a facade.
+   * @return the undelying object.
+   */
+  @Private
+  TezCounter getUnderlyingCounter();
+}

Added: incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/TezCounters.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/TezCounters.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/TezCounters.java (added)
+++ incubator/tez/tez-common/src/main/java/org/apache/tez/common/counters/TezCounters.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common.counters;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * <p><code>Counters</code> holds per job/task counters, defined either by the
+ * Map-Reduce framework or applications. Each <code>Counter</code> can be of
+ * any {@link Enum} type.</p>
+ *
+ * <p><code>Counters</code> are bunched into {@link CounterGroup}s, each
+ * comprising of counters from a particular <code>Enum</code> class.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class TezCounters extends AbstractCounters<TezCounter, CounterGroup> {
+
+  // Mix framework group implementation into CounterGroup interface
+  private static class FrameworkGroupImpl<T extends Enum<T>>
+      extends FrameworkCounterGroup<T, TezCounter> implements CounterGroup {
+
+    FrameworkGroupImpl(Class<T> cls) {
+      super(cls);
+    }
+
+    @Override
+    protected FrameworkCounter<T> newCounter(T key) {
+      return new FrameworkCounter<T>(key, getName());
+    }
+
+    @Override
+    public CounterGroupBase<TezCounter> getUnderlyingGroup() {
+      return this;
+    }
+  }
+
+  // Mix generic group implementation into CounterGroup interface
+  // and provide some mandatory group factory methods.
+  private static class GenericGroup extends AbstractCounterGroup<TezCounter>
+      implements CounterGroup {
+
+    GenericGroup(String name, String displayName, Limits limits) {
+      super(name, displayName, limits);
+    }
+
+    @Override
+    protected TezCounter newCounter(String name, String displayName, long value) {
+      return new GenericCounter(name, displayName, value);
+    }
+
+    @Override
+    protected TezCounter newCounter() {
+      return new GenericCounter();
+    }
+
+    @Override
+    public CounterGroupBase<TezCounter> getUnderlyingGroup() {
+      return this;
+    }
+  }
+
+  // Mix file system group implementation into the CounterGroup interface
+  private static class FileSystemGroup extends FileSystemCounterGroup<TezCounter>
+      implements CounterGroup {
+
+    @Override
+    protected TezCounter newCounter(String scheme, FileSystemCounter key) {
+      return new FSCounter(scheme, key);
+    }
+
+    @Override
+    public CounterGroupBase<TezCounter> getUnderlyingGroup() {
+      return this;
+    }
+  }
+
+  /**
+   * Provide factory methods for counter group factory implementation.
+   * See also the GroupFactory in
+   *  {@link org.apache.hadoop.TezCounters.Counters mapred.Counters}
+   */
+  private static class GroupFactory
+      extends CounterGroupFactory<TezCounter, CounterGroup> {
+
+    @Override
+    protected <T extends Enum<T>>
+    FrameworkGroupFactory<CounterGroup>
+        newFrameworkGroupFactory(final Class<T> cls) {
+      return new FrameworkGroupFactory<CounterGroup>() {
+        @Override public CounterGroup newGroup(String name) {
+          return new FrameworkGroupImpl<T>(cls); // impl in this package
+        }
+      };
+    }
+
+    @Override
+    protected CounterGroup newGenericGroup(String name, String displayName,
+                                           Limits limits) {
+      return new GenericGroup(name, displayName, limits);
+    }
+
+    @Override
+    protected CounterGroup newFileSystemGroup() {
+      return new FileSystemGroup();
+    }
+  }
+
+  private static final GroupFactory groupFactory = new GroupFactory();
+
+  /**
+   * Default constructor
+   */
+  public TezCounters() {
+    super(groupFactory);
+  }
+
+  /**
+   * Construct the Counters object from the another counters object
+   * @param <C> the type of counter
+   * @param <G> the type of counter group
+   * @param counters the old counters object
+   */
+  public <C extends TezCounter, G extends CounterGroupBase<C>>
+  TezCounters(AbstractCounters<C, G> counters) {
+    super(counters, groupFactory);
+  }
+}

Added: incubator/tez/tez-common/src/main/java/org/apache/tez/records/TezContainerId.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-common/src/main/java/org/apache/tez/records/TezContainerId.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-common/src/main/java/org/apache/tez/records/TezContainerId.java (added)
+++ incubator/tez/tez-common/src/main/java/org/apache/tez/records/TezContainerId.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.records;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+
+//TODO EVENTUALLY Once everything is on PB, get rid of this.
+//Alternately have the PB interfaces implement Writable.
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TezContainerId implements Writable {
+
+  private ContainerId containerId;
+
+  public TezContainerId() {
+  }
+  
+  public TezContainerId(ContainerId containerId) {
+    this.containerId = containerId;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(containerId.getApplicationAttemptId().getApplicationId()
+        .getClusterTimestamp());
+    out.writeInt(containerId.getApplicationAttemptId().getApplicationId()
+        .getId());
+    out.writeInt(containerId.getApplicationAttemptId().getAttemptId());
+    out.writeInt(containerId.getId());
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    long timestamp = in.readLong();
+    int appId = in.readInt();
+    int appAttemptId = in.readInt();
+    int id = in.readInt();
+    this.containerId = BuilderUtils.newContainerId(appId, appAttemptId,
+        timestamp, id);
+  }
+
+  @Override
+  public String toString() {
+    return containerId.toString();
+  }
+  
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+}

Added: incubator/tez/tez-dist/pom.xml
URL: http://svn.apache.org/viewvc/incubator/tez/tez-dist/pom.xml?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-dist/pom.xml (added)
+++ incubator/tez/tez-dist/pom.xml Fri Mar 15 21:26:36 2013
@@ -0,0 +1,100 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez</artifactId>
+    <version>0.1.0</version>
+  </parent>
+  <groupId>org.apache.tez</groupId>
+  <artifactId>tez-dist</artifactId>
+  <version>0.1.0</version>
+
+  <packaging>pom</packaging>
+
+  <dependencies>
+    <!--tez-yarn-client should require all other modules to be built before it, so this becomes the last -->
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-yarn-client</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-ampool</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+
+  <properties>
+    <package.format>dir</package.format>
+    <!--includeBaseDirectory is not used - replacement does not work in the packaging-->
+    <package.includeBaseDirectory>false</package.includeBaseDirectory>
+  </properties>
+
+  <profiles>
+    <profile>
+      <id>dist-tar</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+        <property>
+          <name>tar</name>
+        </property>
+      </activation>
+      <properties>
+        <package.format>tar.gz</package.format>
+        <package.includeBaseDirectory>true</package.includeBaseDirectory>
+      </properties>
+    </profile>
+  </profiles>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptors>
+            <descriptor>src/main/assembly/tez-dist.xml</descriptor>
+          </descriptors>
+          <tarLongFileMode>gnu</tarLongFileMode>
+          <appendAssemblyId>false</appendAssemblyId>
+          <attach>false</attach>
+          <finalName>tez-${project.version}</finalName>
+        </configuration>
+        <executions>
+          <execution>
+            <id>package-tez</id>
+            <configuration>
+              <formats>
+                <format>${package.format}</format>
+              </formats>
+            </configuration>
+            <phase>prepare-package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

Added: incubator/tez/tez-dist/src/main/assembly/tez-dist.xml
URL: http://svn.apache.org/viewvc/incubator/tez/tez-dist/src/main/assembly/tez-dist.xml?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-dist/src/main/assembly/tez-dist.xml (added)
+++ incubator/tez/tez-dist/src/main/assembly/tez-dist.xml Fri Mar 15 21:26:36 2013
@@ -0,0 +1,114 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>tez-dist</id>
+  <!--includeBaseDirectory paramter replacement does not seem to work -->
+  <!--includeBaseDirectory>${package.includeBaseDirectory}</includeBaseDirectory-->
+  <!--fileSets>
+    <fileSet>
+      <directory>${project.build.directory}</directory>
+      <outputDirectory>/</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+  </fileSets-->
+  <moduleSets>
+    <moduleSet>
+      <useAllReactorProjects>true</useAllReactorProjects>
+      <includes>
+        <include>org.apache.tez:tez-yarn-application</include>
+        <include>org.apache.tez:tez-yarn-client</include>
+        <include>org.apache.tez:tez-ampool</include>
+      </includes>
+      <binaries>
+        <outputDirectory>/</outputDirectory>
+        <includeDependencies>false</includeDependencies>
+        <unpack>false</unpack>
+      </binaries>
+      <sources>
+        <includeModuleDirectory>false</includeModuleDirectory>
+        <fileSets>
+          <fileSet>
+            <directory>src/main/bin</directory>
+            <outputDirectory>/bin</outputDirectory>
+            <excludes>
+              <exclude>*.sh</exclude>
+            </excludes>
+            <fileMode>0755</fileMode>
+          </fileSet>
+          <fileSet>
+            <directory>src/main/bin</directory>
+            <outputDirectory>/libexec</outputDirectory>
+            <includes>
+              <include>*-config.sh</include>
+            </includes>
+            <fileMode>0755</fileMode>
+          </fileSet>
+          <fileSet>
+            <directory>src/main/bin</directory>
+            <outputDirectory>/sbin</outputDirectory>
+            <includes>
+              <include>*.sh</include>
+            </includes>
+            <excludes>
+              <exclude>*-config.sh</exclude>
+            </excludes>
+            <fileMode>0755</fileMode>
+          </fileSet>
+          <fileSet>
+            <directory>src/main/conf</directory>
+            <outputDirectory>/conf</outputDirectory>
+            <includes>
+              <include>*</include>
+            </includes>
+            <fileMode>0755</fileMode>
+          </fileSet>
+        </fileSets>
+      </sources>
+    </moduleSet>
+  </moduleSets>
+  <dependencySets>
+    <dependencySet>
+      <useProjectArtifact>false</useProjectArtifact>
+      <outputDirectory>/lib</outputDirectory>
+      <excludes>
+        <!-- Remove dups from top-level -->
+        <exclude>org.apache.tez:tez-yarn-application</exclude>
+        <exclude>org.apache.tez:tez-yarn-client</exclude>
+        <exclude>org.apache.tez:tez-ampool</exclude>
+        <!-- Exclude hadoop artifacts. They will be found via HADOOP* env -->
+        <exclude>org.apache.hadoop:hadoop-common</exclude>
+        <exclude>org.apache.hadoop:hadoop-auth</exclude>
+        <exclude>org.apache.hadoop:hadoop-annotations</exclude>
+        <!--exclude>org.apache.hadoop:hadoop-mapreduce-client-common</exclude-->
+        <!--exclude>org.apache.hadoop:hadoop-mapreduce-client-core</exclude-->
+        <!--exclude>org.apache.hadoop:hadoop-mapreduce-client-shuffle</exclude-->
+        <exclude>org.apache.hadoop:hadoop-yarn-api</exclude>
+        <exclude>org.apache.hadoop:hadoop-yarn-client</exclude>
+        <exclude>org.apache.hadoop:hadoop-yarn-common</exclude>
+        <exclude>org.apache.hadoop:hadoop-yarn-server-common</exclude>
+        <exclude>org.apache.hadoop:hadoop-yarn-server-nodemanager</exclude>
+        <!-- use slf4j from common to avoid multiple binding warnings -->
+        <exclude>org.slf4j:slf4j-api</exclude>
+        <exclude>org.slf4j:slf4j-log4j12</exclude>
+      </excludes>
+    </dependencySet>
+  </dependencySets>
+</assembly>

Added: incubator/tez/tez-engine/.classpath
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/.classpath?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/.classpath (added)
+++ incubator/tez/tez-engine/.classpath Fri Mar 15 21:26:36 2013
@@ -0,0 +1,83 @@
+<classpath>
+  <classpathentry kind="src" path="src/main/java" including="**/*.java"/>
+  <classpathentry kind="output" path="target/classes"/>
+  <classpathentry kind="var" path="M2_REPO/javax/activation/activation/1.1/activation-1.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/javax/inject/javax.inject/1/javax.inject-1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/javax/xml/bind/jaxb-api/2.2.2/jaxb-api-2.2.2.jar"/>
+  <classpathentry kind="var" path="M2_REPO/javax/servlet/jsp/jsp-api/2.1/jsp-api-2.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar"/>
+  <classpathentry kind="var" path="M2_REPO/aopalliance/aopalliance/1.0/aopalliance-1.0.jar"/>
+  <classpathentry kind="var" path="M2_REPO/asm/asm/3.1/asm-3.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/apache/avro/avro/1.5.3/avro-1.5.3.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-beanutils/commons-beanutils/1.7.0/commons-beanutils-1.7.0.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-beanutils/commons-beanutils-core/1.8.0/commons-beanutils-core-1.8.0.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-cli/commons-cli/1.2/commons-cli-1.2.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-codec/commons-codec/1.4/commons-codec-1.4.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-collections/commons-collections/3.2.1/commons-collections-3.2.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/apache/commons/commons-compress/1.4/commons-compress-1.4.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-configuration/commons-configuration/1.6/commons-configuration-1.6.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-digester/commons-digester/1.8/commons-digester-1.8.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-el/commons-el/1.0/commons-el-1.0.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-httpclient/commons-httpclient/3.1/commons-httpclient-3.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-io/commons-io/2.1/commons-io-2.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-lang/commons-lang/2.5/commons-lang-2.5.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-logging/commons-logging/1.1.1/commons-logging-1.1.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/apache/commons/commons-math/2.1/commons-math-2.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-net/commons-net/3.1/commons-net-3.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/google/guava/guava/11.0.2/guava-11.0.2.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/google/inject/guice/3.0/guice-3.0.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/google/inject/extensions/guice-assistedinject/3.0/guice-assistedinject-3.0.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-annotations/3.0.0-SNAPSHOT/hadoop-annotations-3.0.0-SNAPSHOT.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-auth/3.0.0-SNAPSHOT/hadoop-auth-3.0.0-SNAPSHOT.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-common/3.0.0-SNAPSHOT/hadoop-common-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-common/3.0.0-SNAPSHOT/hadoop-common-3.0.0-SNAPSHOT-sources.jar">
+    <attributes>
+      <attribute value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-common/3.0.0-SNAPSHOT/hadoop-common-3.0.0-SNAPSHOT-javadoc.jar!/" name="javadoc_location"/>
+    </attributes>
+  </classpathentry>
+  <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-yarn-api/3.0.0-SNAPSHOT/hadoop-yarn-api-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-yarn-api/3.0.0-SNAPSHOT/hadoop-yarn-api-3.0.0-SNAPSHOT-sources.jar">
+    <attributes>
+      <attribute value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-yarn-api/3.0.0-SNAPSHOT/hadoop-yarn-api-3.0.0-SNAPSHOT-javadoc.jar!/" name="javadoc_location"/>
+    </attributes>
+  </classpathentry>
+  <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-yarn-common/3.0.0-SNAPSHOT/hadoop-yarn-common-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-yarn-common/3.0.0-SNAPSHOT/hadoop-yarn-common-3.0.0-SNAPSHOT-sources.jar">
+    <attributes>
+      <attribute value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-yarn-common/3.0.0-SNAPSHOT/hadoop-yarn-common-3.0.0-SNAPSHOT-javadoc.jar!/" name="javadoc_location"/>
+    </attributes>
+  </classpathentry>
+  <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-yarn-server-nodemanager/3.0.0-SNAPSHOT/hadoop-yarn-server-nodemanager-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-yarn-server-nodemanager/3.0.0-SNAPSHOT/hadoop-yarn-server-nodemanager-3.0.0-SNAPSHOT-sources.jar">
+    <attributes>
+      <attribute value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-yarn-server-nodemanager/3.0.0-SNAPSHOT/hadoop-yarn-server-nodemanager-3.0.0-SNAPSHOT-javadoc.jar!/" name="javadoc_location"/>
+    </attributes>
+  </classpathentry>
+  <classpathentry kind="var" path="M2_REPO/org/codehaus/jackson/jackson-core-asl/1.8.8/jackson-core-asl-1.8.8.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/codehaus/jackson/jackson-jaxrs/1.7.1/jackson-jaxrs-1.7.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/codehaus/jackson/jackson-mapper-asl/1.8.8/jackson-mapper-asl-1.8.8.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/codehaus/jackson/jackson-xc/1.7.1/jackson-xc-1.7.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/tomcat/jasper-compiler/5.5.23/jasper-compiler-5.5.23.jar"/>
+  <classpathentry kind="var" path="M2_REPO/tomcat/jasper-runtime/5.5.23/jasper-runtime-5.5.23.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/sun/xml/bind/jaxb-impl/2.2.3-1/jaxb-impl-2.2.3-1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/sun/jersey/jersey-core/1.8/jersey-core-1.8.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/sun/jersey/jersey-json/1.8/jersey-json-1.8.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/sun/jersey/jersey-server/1.8/jersey-server-1.8.jar"/>
+  <classpathentry kind="var" path="M2_REPO/net/java/dev/jets3t/jets3t/0.6.1/jets3t-0.6.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/codehaus/jettison/jettison/1.1/jettison-1.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/mortbay/jetty/jetty/6.1.26/jetty-6.1.26.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar"/>
+  <classpathentry kind="var" path="M2_REPO/jline/jline/0.9.94/jline-0.9.94.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/jcraft/jsch/0.1.42/jsch-0.1.42.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar"/>
+  <classpathentry kind="var" path="M2_REPO/log4j/log4j/1.2.17/log4j-1.2.17.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/jboss/netty/netty/3.2.2.Final/netty-3.2.2.Final.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/google/protobuf/protobuf-java/2.4.0a/protobuf-java-2.4.0a.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/slf4j/slf4j-api/1.6.1/slf4j-api-1.6.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/xerial/snappy/snappy-java/1.0.3.2/snappy-java-1.0.3.2.jar"/>
+  <classpathentry kind="var" path="M2_REPO/stax/stax-api/1.0.1/stax-api-1.0.1.jar"/>
+  <classpathentry kind="src" path="/tez-api"/>
+  <classpathentry kind="src" path="/tez-common"/>
+  <classpathentry kind="var" path="M2_REPO/xmlenc/xmlenc/0.52/xmlenc-0.52.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/tukaani/xz/1.0/xz-1.0.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/apache/zookeeper/zookeeper/3.4.2/zookeeper-3.4.2.jar"/>
+  <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+</classpath>
\ No newline at end of file

Added: incubator/tez/tez-engine/.project
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/.project?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/.project (added)
+++ incubator/tez/tez-engine/.project Fri Mar 15 21:26:36 2013
@@ -0,0 +1,16 @@
+<projectDescription>
+  <name>tez-engine</name>
+  <comment>NO_M2ECLIPSE_SUPPORT: Project files created with the maven-eclipse-plugin are not supported in M2Eclipse.</comment>
+  <projects>
+    <project>tez-api</project>
+    <project>tez-common</project>
+  </projects>
+  <buildSpec>
+    <buildCommand>
+      <name>org.eclipse.jdt.core.javabuilder</name>
+    </buildCommand>
+  </buildSpec>
+  <natures>
+    <nature>org.eclipse.jdt.core.javanature</nature>
+  </natures>
+</projectDescription>
\ No newline at end of file

Added: incubator/tez/tez-engine/pom.xml
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/pom.xml?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/pom.xml (added)
+++ incubator/tez/tez-engine/pom.xml Fri Mar 15 21:26:36 2013
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez</artifactId>
+    <version>0.1.0</version>
+  </parent>
+  <artifactId>tez-engine</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.inject.extensions</groupId>
+      <artifactId>guice-assistedinject</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-nodemanager</artifactId>
+    </dependency>
+  </dependencies>
+</project>

Added: incubator/tez/tez-engine/src/main/java/org/apache/hadoop/io/BufferUtils.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/hadoop/io/BufferUtils.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/hadoop/io/BufferUtils.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/hadoop/io/BufferUtils.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
+@InterfaceStability.Unstable
+public class BufferUtils {
+  public static int compare(DataInputBuffer buf1, DataInputBuffer buf2) {
+    byte[] b1 = buf1.getData();
+    byte[] b2 = buf2.getData();
+    int s1 = buf1.getPosition();
+    int s2 = buf2.getPosition();
+    int l1 = buf1.getLength();
+    int l2 = buf2.getLength();
+    return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
+  }
+
+  public static int compare(DataOutputBuffer buf1, DataOutputBuffer buf2) {
+    byte[] b1 = buf1.getData();
+    byte[] b2 = buf2.getData();
+    int s1 = 0;
+    int s2 = 0;
+    int l1 = buf1.getLength();
+    int l2 = buf2.getLength();
+    return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
+  }
+
+  public static int compare(DataInputBuffer buf1, DataOutputBuffer buf2) {
+    byte[] b1 = buf1.getData();
+    byte[] b2 = buf2.getData();
+    int s1 = buf1.getPosition();    
+    int s2 = 0;
+    int l1 = buf1.getLength();
+    int l2 = buf2.getLength();
+    return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
+  }
+
+  public static int compare(DataOutputBuffer buf1, DataInputBuffer buf2) {
+    return compare(buf2, buf1);
+  }
+
+  public static void copy(DataInputBuffer src, DataOutputBuffer dst) 
+                              throws IOException {
+    byte[] b1 = src.getData();
+    int s1 = src.getPosition();    
+    int l1 = src.getLength();
+    dst.reset();
+    dst.write(b1, s1, l1 - s1);
+  }
+
+  public static void copy(DataOutputBuffer src, DataOutputBuffer dst) 
+                              throws IOException {
+    byte[] b1 = src.getData();
+    int s1 = 0;
+    int l1 = src.getLength();
+    dst.reset();
+    dst.write(b1, s1, l1);
+  }
+
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/hadoop/io/HashComparator.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/hadoop/io/HashComparator.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/hadoop/io/HashComparator.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/hadoop/io/HashComparator.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io;
+
+public interface HashComparator<KEY> {
+
+  int getHashCode(KEY key);
+
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,101 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class ConfigUtils {
+  public static  Class<? extends CompressionCodec> getMapOutputCompressorClass(
+      Configuration conf, Class<DefaultCodec> class1) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  public static  boolean getCompressMapOutput(Configuration conf) {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  public static <V> Class<V> getMapOutputValueClass(Configuration conf) {
+    Class<V> retv = 
+        (Class<V>) 
+        conf.getClass("mapreduce.map.output.value.class", null, Object.class);
+    if (retv == null) {
+      retv = getOutputValueClass(conf);
+    }
+    return retv;
+  }
+
+  public static <V> Class<V> getOutputValueClass(Configuration conf) {
+    return (Class<V>) conf.getClass(
+        "mapreduce.job.output.value.class", Text.class, Object.class);
+  }
+
+  public static <K> Class<K> getMapOutputKeyClass(Configuration conf) {
+    Class<K> retv = 
+        (Class<K>) conf.getClass("mapreduce.map.output.key.class", null, Object.class);
+    if (retv == null) {
+      retv = getOutputKeyClass(conf);
+    }
+    return 
+        retv;
+  }
+
+  public static <K> Class<K> getOutputKeyClass(Configuration conf) {
+    return 
+        (Class<K>) 
+        conf.getClass(
+            "mapreduce.job.output.key.class", 
+            LongWritable.class, Object.class);
+}
+  
+  public static <K> RawComparator<K> getOutputKeyComparator(Configuration conf) {
+    Class<? extends RawComparator> theClass = 
+        conf.getClass(
+            "mapreduce.job.output.key.comparator.class", null, 
+            RawComparator.class);
+      if (theClass != null)
+        return ReflectionUtils.newInstance(theClass, conf);
+      return WritableComparator.get(
+          getMapOutputKeyClass(conf).asSubclass(WritableComparable.class));
+    }
+
+  public static <V> RawComparator<V> getOutputValueGroupingComparator(
+      Configuration conf) {
+    Class<? extends RawComparator> theClass = 
+        conf.getClass(
+            "mapreduce.job.output.group.comparator.class", 
+            null, RawComparator.class);
+    if (theClass == null) {
+      return getOutputKeyComparator(conf);
+    }
+
+    return ReflectionUtils.newInstance(theClass, conf);
+  }
+
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/YARNMaster.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/YARNMaster.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/YARNMaster.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/YARNMaster.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,57 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+@Private
+@Unstable
+public class YARNMaster {
+  
+  public enum State {
+    INITIALIZING, RUNNING;
+  }
+
+  public static String getMasterUserName(Configuration conf) {
+    return conf.get(YarnConfiguration.RM_PRINCIPAL);
+  }
+  
+  public static InetSocketAddress getMasterAddress(Configuration conf) {
+    return conf.getSocketAddr(
+        YarnConfiguration.RM_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_PORT);
+  }
+
+  public static String getMasterPrincipal(Configuration conf) 
+  throws IOException {
+    String masterHostname = getMasterAddress(conf).getHostName();
+    // get kerberos principal for use as delegation token renewer
+    return SecurityUtil.getServerPrincipal(
+        getMasterUserName(conf), masterHostname);
+  }
+  
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.combine;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Master;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class CombineInput implements Input {
+
+  private final TezRawKeyValueIterator input;
+  private TezCounter inputValueCounter;
+  private TezCounter inputKeyCounter;
+  private RawComparator<Object> comparator;
+  private Object key;                                  // current key
+  private Object value;                              // current value
+  private boolean firstValue = false;                 // first value in key
+  private boolean nextKeyIsSame = false;              // more w/ this key
+  private boolean hasMore;                            // more in file
+  protected Progressable reporter;
+  private Deserializer keyDeserializer;
+  private Deserializer valueDeserializer;
+  private DataInputBuffer buffer = new DataInputBuffer();
+  private BytesWritable currentRawKey = new BytesWritable();
+  private ValueIterable iterable = new ValueIterable();
+  
+  public CombineInput(TezRawKeyValueIterator kvIter) {
+    this.input = kvIter;
+  }
+
+  public void initialize(Configuration conf, Master master) throws IOException,
+      InterruptedException {
+  }
+
+  public boolean hasNext() throws IOException, InterruptedException {
+    while (hasMore && nextKeyIsSame) {
+      nextKeyValue();
+    }
+    if (hasMore) {
+      if (inputKeyCounter != null) {
+        inputKeyCounter.increment(1);
+      }
+      return nextKeyValue();
+    } else {
+      return false;
+    }
+  }
+
+  private boolean nextKeyValue() throws IOException, InterruptedException {
+    if (!hasMore) {
+      key = null;
+      value = null;
+      return false;
+    }
+    firstValue = !nextKeyIsSame;
+    DataInputBuffer nextKey = input.getKey();
+    currentRawKey.set(nextKey.getData(), nextKey.getPosition(), 
+                      nextKey.getLength() - nextKey.getPosition());
+    buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
+    key = keyDeserializer.deserialize(key);
+    DataInputBuffer nextVal = input.getValue();
+    buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength());
+    value = valueDeserializer.deserialize(value);
+
+    hasMore = input.next();
+    if (hasMore) {
+      nextKey = input.getKey();
+      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
+                                     currentRawKey.getLength(),
+                                     nextKey.getData(),
+                                     nextKey.getPosition(),
+                                     nextKey.getLength() - nextKey.getPosition()
+                                         ) == 0;
+    } else {
+      nextKeyIsSame = false;
+    }
+    inputValueCounter.increment(1);
+    return true;
+  }
+
+  public Object getNextKey() throws IOException, InterruptedException {
+    return key;
+  }
+
+  public Iterable getNextValues() throws IOException,
+      InterruptedException {
+    return iterable;
+  }
+
+  public float getProgress() throws IOException, InterruptedException {
+    return input.getProgress().getProgress();
+  }
+
+  public void close() throws IOException {
+    input.close();
+  }
+  
+  protected class ValueIterator implements Iterator<Object> {
+
+
+    public boolean hasNext() {
+      return firstValue || nextKeyIsSame;
+    }
+
+    public Object next() {
+
+      // if this is the first record, we don't need to advance
+      if (firstValue) {
+        firstValue = false;
+        return value;
+      }
+      // if this isn't the first record and the next key is different, they
+      // can't advance it here.
+      if (!nextKeyIsSame) {
+        throw new NoSuchElementException("iterate past last value");
+      }
+      // otherwise, go to the next key/value pair
+      try {
+        nextKeyValue();
+        return value;
+      } catch (IOException ie) {
+        throw new RuntimeException("next value iterator failed", ie);
+      } catch (InterruptedException ie) {
+        // this is bad, but we can't modify the exception list of java.util
+        throw new RuntimeException("next value iterator interrupted", ie);        
+      }
+    }
+
+    public void remove() {
+      throw new UnsupportedOperationException("remove not implemented");
+    }
+  }
+
+
+  
+  protected class ValueIterable implements Iterable<Object> {
+    private ValueIterator iterator = new ValueIterator();
+    public Iterator<Object> iterator() {
+      return iterator;
+    } 
+  }
+  
+
+
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineOutput.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineOutput.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineOutput.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.combine;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.api.Master;
+import org.apache.tez.api.Output;
+import org.apache.tez.engine.common.sort.impl.IFile.Writer;
+import org.apache.tez.records.OutputContext;
+
+public class CombineOutput implements Output {
+
+  private final Writer writer;
+  
+  public CombineOutput(Writer writer) {
+    this.writer = writer;
+  }
+
+  public void initialize(Configuration conf, Master master) throws IOException,
+      InterruptedException {
+    // TODO Auto-generated method stub
+
+  }
+
+  public void write(Object key, Object value) throws IOException,
+      InterruptedException {
+    writer.append(key, value);
+  }
+
+  @Override
+  public OutputContext getOutputContext() {
+    return null;
+  }
+  
+  public void close() throws IOException, InterruptedException {
+    writer.close();
+  }
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,127 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common.localshuffle;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskReporter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.sort.impl.TezMerger;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
+
+@SuppressWarnings({"rawtypes"})
+public class LocalShuffle {
+
+  private final TezTask task;
+  private final Configuration conf;
+  private final int tasksInDegree;
+
+  private final Class keyClass;
+  private final Class valClass;
+  private final RawComparator comparator;
+
+  private final FileSystem rfs;
+  private final int sortFactor;
+  
+  private final TezCounter spilledRecordsCounter;
+  private final CompressionCodec codec;
+  private final TezTaskOutput mapOutputFile;
+
+  public LocalShuffle(TezTask task, 
+      Configuration conf,
+      TezTaskReporter reporter
+      ) throws IOException {
+    this.task = task;
+    this.conf = conf;
+    this.keyClass = ConfigUtils.getMapOutputKeyClass(conf);
+    this.valClass = ConfigUtils.getMapOutputValueClass(conf);
+    this.comparator = ConfigUtils.getOutputKeyComparator(conf);
+
+    this.sortFactor =
+        conf.getInt(
+            TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR, 
+            TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR);
+    
+    this.rfs = FileSystem.getLocal(conf).getRaw();
+
+    this.spilledRecordsCounter = 
+        reporter.getCounter(TaskCounter.SPILLED_RECORDS);
+    
+    // compression
+    if (ConfigUtils.getCompressMapOutput(conf)) {
+      Class<? extends CompressionCodec> codecClass =
+          ConfigUtils.getMapOutputCompressorClass(conf, DefaultCodec.class);
+      this.codec = ReflectionUtils.newInstance(codecClass, conf);
+    } else {
+      this.codec = null;
+    }
+
+    this.tasksInDegree = 
+        conf.getInt(
+            TezJobConfig.TEZ_ENGINE_TASK_INDEGREE, 
+            TezJobConfig.DEFAULT_TEZ_ENGINE_TASK_INDEGREE);
+
+    // Always local
+    this.mapOutputFile = new TezLocalTaskOutputFiles();
+    this.mapOutputFile.setConf(conf);
+
+  }
+  
+  public TezRawKeyValueIterator run() throws IOException {
+    // Copy is complete, obviously! 
+    this.task.getProgress().addPhase("copy", 0.33f).complete();
+
+    // Merge
+    return TezMerger.merge(conf, rfs, 
+        keyClass, valClass,
+        codec, 
+        getMapFiles(),
+        false, 
+        sortFactor,
+        new Path(task.getTaskAttemptId().toString()), 
+        comparator,
+        task.getTaskReporter(), spilledRecordsCounter, null, null);
+  }
+  
+  private Path[] getMapFiles() 
+  throws IOException {
+    List<Path> fileList = new ArrayList<Path>();
+      // for local jobs
+      for(int i = 0; i < tasksInDegree; ++i) {
+        fileList.add(mapOutputFile.getInputFile(i));
+      }
+      
+    return fileList.toArray(new Path[0]);
+  }
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/DelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/DelegationTokenRenewal.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/DelegationTokenRenewal.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/DelegationTokenRenewal.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.security;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.records.TezJobID;
+
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class DelegationTokenRenewal {
+  private static final Log LOG = LogFactory.getLog(DelegationTokenRenewal.class);
+  public static final String SCHEME = "hdfs";
+  
+  /**
+   * class that is used for keeping tracks of DT to renew
+   *
+   */
+  private static class DelegationTokenToRenew {
+    public final Token<?> token;
+    public final TezJobID jobId;
+    public final Configuration conf;
+    public long expirationDate;
+    public TimerTask timerTask;
+    
+    public DelegationTokenToRenew(
+        TezJobID jId, Token<?> t, 
+        Configuration newConf, long newExpirationDate) {
+      token = t;
+      jobId = jId;
+      conf = newConf;
+      expirationDate = newExpirationDate;
+      timerTask = null;
+      if(token==null || jobId==null || conf==null) {
+        throw new IllegalArgumentException("invalid params for Renew Token" +
+            ";t="+token+";j="+jobId+";c="+conf);
+      }
+    }
+    public void setTimerTask(TimerTask tTask) {
+      timerTask = tTask;
+    }
+    @Override
+    public String toString() {
+      return token + ";exp="+expirationDate;
+    }
+    @Override
+    public boolean equals (Object obj) {
+      if (obj == this) {
+        return true;
+      } else if (obj == null || getClass() != obj.getClass()) {
+        return false;
+      } else {
+        return token.equals(((DelegationTokenToRenew)obj).token);
+      }
+    }
+    @Override
+    public int hashCode() {
+      return token.hashCode();
+    }
+  }
+  
+  // global single timer (daemon)
+  private static Timer renewalTimer = new Timer(true);
+  
+  //delegation token canceler thread
+  private static DelegationTokenCancelThread dtCancelThread =
+    new DelegationTokenCancelThread();
+  static {
+    dtCancelThread.start();
+  }
+
+  
+  //managing the list of tokens using Map
+  // jobId=>List<tokens>
+  private static Set<DelegationTokenToRenew> delegationTokens = 
+    Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>());
+  
+  private static class DelegationTokenCancelThread extends Thread {
+    private static class TokenWithConf {
+      Token<?> token;
+      Configuration conf;
+      TokenWithConf(Token<?> token, Configuration conf) {
+        this.token = token;
+        this.conf = conf;
+      }
+    }
+    private LinkedBlockingQueue<TokenWithConf> queue =  
+      new LinkedBlockingQueue<TokenWithConf>();
+     
+    public DelegationTokenCancelThread() {
+      super("Delegation Token Canceler");
+      setDaemon(true);
+    }
+    public void cancelToken(Token<?> token,  
+        Configuration conf) {
+      TokenWithConf tokenWithConf = new TokenWithConf(token, conf);
+      while (!queue.offer(tokenWithConf)) {
+        LOG.warn("Unable to add token " + token + " for cancellation. " +
+        		 "Will retry..");
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    public void run() {
+      while (true) {
+        TokenWithConf tokenWithConf = null;
+        try {
+          tokenWithConf = queue.take();
+          final TokenWithConf current = tokenWithConf;
+          
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Canceling token " + tokenWithConf.token.getService());
+          }
+          // need to use doAs so that http can find the kerberos tgt
+          UserGroupInformation.getLoginUser().doAs(
+              new PrivilegedExceptionAction<Void>() {
+
+                @Override
+                public Void run() throws Exception {
+                  current.token.cancel(current.conf);
+                  return null;
+                }
+              });
+        } catch (IOException e) {
+          LOG.warn("Failed to cancel token " + tokenWithConf.token + " " +  
+              StringUtils.stringifyException(e));
+        } catch (InterruptedException ie) {
+          return;
+        } catch (Throwable t) {
+          LOG.warn("Got exception " + StringUtils.stringifyException(t) + 
+                   ". Exiting..");
+          System.exit(-1);
+        }
+      }
+    }
+  }
+  //adding token
+  private static void addTokenToList(DelegationTokenToRenew t) {
+    delegationTokens.add(t);
+  }
+  
+  public static synchronized void registerDelegationTokensForRenewal(
+      TezJobID jobId, Credentials ts, Configuration conf) throws IOException {
+    if(ts==null)
+      return; //nothing to add
+    
+    Collection <Token<?>> tokens = ts.getAllTokens();
+    long now = System.currentTimeMillis();
+
+    for (Token<?> t : tokens) {
+      // first renew happens immediately
+      if (t.isManaged()) {
+        DelegationTokenToRenew dtr = new DelegationTokenToRenew(jobId, t, conf,
+            now);
+
+        addTokenToList(dtr);
+
+        setTimerForTokenRenewal(dtr, true);
+        LOG.info("registering token for renewal for service =" + t.getService()
+            + " and jobID = " + jobId);
+      }
+    }
+  }
+    
+  /**
+   * Task - to renew a token
+   *
+   */
+  private static class RenewalTimerTask extends TimerTask {
+    private DelegationTokenToRenew dttr;
+    
+    RenewalTimerTask(DelegationTokenToRenew t) {  dttr = t;  }
+    
+    @Override
+    public void run() {
+      Token<?> token = dttr.token;
+      long newExpirationDate=0;
+      try {
+        // need to use doAs so that http can find the kerberos tgt
+        dttr.expirationDate = UserGroupInformation.getLoginUser().doAs(
+            new PrivilegedExceptionAction<Long>() {
+
+              @Override
+              public Long run() throws Exception {
+                return dttr.token.renew(dttr.conf);
+              }
+            });
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("renewing for:" + token.getService() + ";newED="
+              + dttr.expirationDate);
+        }
+        setTimerForTokenRenewal(dttr, false);// set the next one
+      } catch (Exception e) {
+        LOG.error("Exception renewing token" + token + ". Not rescheduled", e);
+        removeFailedDelegationToken(dttr);
+      }
+    }
+  }
+  
+  /**
+   * find the soonest expiring token and set it for renew
+   */
+  private static void setTimerForTokenRenewal(
+      DelegationTokenToRenew token, boolean firstTime) {
+      
+    // calculate timer time
+    long now = System.currentTimeMillis();
+    long renewIn;
+    if(firstTime) {
+      renewIn = now;
+    } else {
+      long expiresIn = (token.expirationDate - now); 
+      renewIn = now + expiresIn - expiresIn/10; // little before expiration
+    }
+    
+    // need to create new timer every time
+    TimerTask tTask = new RenewalTimerTask(token);
+    token.setTimerTask(tTask); // keep reference to the timer
+
+    renewalTimer.schedule(token.timerTask, new Date(renewIn));
+  }
+
+  /**
+   * removing all tokens renewals
+   */
+  static public void close() {
+    renewalTimer.cancel();
+    delegationTokens.clear();
+  }
+  
+  // cancel a token
+  private static void cancelToken(DelegationTokenToRenew t) {
+    dtCancelThread.cancelToken(t.token, t.conf);
+  }
+  
+  /**
+   * removing failed DT
+   * @param jobId
+   */
+  private static void removeFailedDelegationToken(DelegationTokenToRenew t) {
+    TezJobID jobId = t.jobId;
+    if (LOG.isDebugEnabled())
+      LOG.debug("removing failed delegation token for jobid=" + jobId + 
+          ";t=" + t.token.getService());
+    delegationTokens.remove(t);
+    // cancel the timer
+    if(t.timerTask!=null)
+      t.timerTask.cancel();
+  }
+  
+  /**
+   * removing DT for completed jobs
+   * @param jobId
+   */
+  public static void removeDelegationTokenRenewalForJob(TezJobID jobId) {
+    synchronized (delegationTokens) {
+      Iterator<DelegationTokenToRenew> it = delegationTokens.iterator();
+      while(it.hasNext()) {
+        DelegationTokenToRenew dttr = it.next();
+        if (dttr.jobId.equals(jobId)) {
+          if (LOG.isDebugEnabled())
+            LOG.debug("removing delegation token for jobid=" + jobId + 
+                ";t=" + dttr.token.getService());
+
+          // cancel the timer
+          if(dttr.timerTask!=null)
+            dttr.timerTask.cancel();
+
+          // cancel the token
+          cancelToken(dttr);
+
+          it.remove();
+        }
+      }
+    }
+  }
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenIdentifier.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenIdentifier.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenIdentifier.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenIdentifier.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * The token identifier for job token
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class JobTokenIdentifier extends TokenIdentifier {
+  private Text jobid;
+  public final static Text KIND_NAME = new Text("mapreduce.job");
+  
+  /**
+   * Default constructor
+   */
+  public JobTokenIdentifier() {
+    this.jobid = new Text();
+  }
+
+  /**
+   * Create a job token identifier from a jobid
+   * @param jobid the jobid to use
+   */
+  public JobTokenIdentifier(Text jobid) {
+    this.jobid = jobid;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public Text getKind() {
+    return KIND_NAME;
+  }
+  
+  /** {@inheritDoc} */
+  @Override
+  public UserGroupInformation getUser() {
+    if (jobid == null || "".equals(jobid.toString())) {
+      return null;
+    }
+    return UserGroupInformation.createRemoteUser(jobid.toString());
+  }
+  
+  /**
+   * Get the jobid
+   * @return the jobid
+   */
+  public Text getJobId() {
+    return jobid;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    jobid.readFields(in);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    jobid.write(out);
+  }
+
+  @InterfaceAudience.Private
+  public static class Renewer extends Token.TrivialRenewer {
+    @Override
+    protected Text getKind() {
+      return KIND_NAME;
+    }
+  }
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSecretManager.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSecretManager.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSecretManager.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSecretManager.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.security;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * SecretManager for job token. It can be used to cache generated job tokens.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class JobTokenSecretManager extends SecretManager<JobTokenIdentifier> {
+  private final SecretKey masterKey;
+  private final Map<String, SecretKey> currentJobTokens;
+
+  /**
+   * Convert the byte[] to a secret key
+   * @param key the byte[] to create the secret key from
+   * @return the secret key
+   */
+  public static SecretKey createSecretKey(byte[] key) {
+    return SecretManager.createSecretKey(key);
+  }
+  
+  /**
+   * Compute the HMAC hash of the message using the key
+   * @param msg the message to hash
+   * @param key the key to use
+   * @return the computed hash
+   */
+  public static byte[] computeHash(byte[] msg, SecretKey key) {
+    return createPassword(msg, key);
+  }
+  
+  /**
+   * Default constructor
+   */
+  public JobTokenSecretManager() {
+    this.masterKey = generateSecret();
+    this.currentJobTokens = new TreeMap<String, SecretKey>();
+  }
+  
+  /**
+   * Create a new password/secret for the given job token identifier.
+   * @param identifier the job token identifier
+   * @return token password/secret
+   */
+  @Override
+  public byte[] createPassword(JobTokenIdentifier identifier) {
+    byte[] result = createPassword(identifier.getBytes(), masterKey);
+    return result;
+  }
+
+  /**
+   * Add the job token of a job to cache
+   * @param jobId the job that owns the token
+   * @param token the job token
+   */
+  public void addTokenForJob(String jobId, Token<JobTokenIdentifier> token) {
+    SecretKey tokenSecret = createSecretKey(token.getPassword());
+    synchronized (currentJobTokens) {
+      currentJobTokens.put(jobId, tokenSecret);
+    }
+  }
+
+  /**
+   * Remove the cached job token of a job from cache
+   * @param jobId the job whose token is to be removed
+   */
+  public void removeTokenForJob(String jobId) {
+    synchronized (currentJobTokens) {
+      currentJobTokens.remove(jobId);
+    }
+  }
+  
+  /**
+   * Look up the token password/secret for the given jobId.
+   * @param jobId the jobId to look up
+   * @return token password/secret as SecretKey
+   * @throws InvalidToken
+   */
+  public SecretKey retrieveTokenSecret(String jobId) throws InvalidToken {
+    SecretKey tokenSecret = null;
+    synchronized (currentJobTokens) {
+      tokenSecret = currentJobTokens.get(jobId);
+    }
+    if (tokenSecret == null) {
+      throw new InvalidToken("Can't find job token for job " + jobId + " !!");
+    }
+    return tokenSecret;
+  }
+  
+  /**
+   * Look up the token password/secret for the given job token identifier.
+   * @param identifier the job token identifier to look up
+   * @return token password/secret as byte[]
+   * @throws InvalidToken
+   */
+  @Override
+  public byte[] retrievePassword(JobTokenIdentifier identifier)
+      throws InvalidToken {
+    return retrieveTokenSecret(identifier.getJobId().toString()).getEncoded();
+  }
+
+  /**
+   * Create an empty job token identifier
+   * @return a newly created empty job token identifier
+   */
+  @Override
+  public JobTokenIdentifier createIdentifier() {
+    return new JobTokenIdentifier();
+  }
+}



Mime
View raw message